use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{Duration, Utc};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::events::{
ApprovalDeniedData, AssistantResponseData, DecisionPointData, DecisionType, ErrorData,
ErrorType, Event, EventStore, EventType, FailureCategory, RootCauseCandidate, TaskEndData,
TaskStartData, ThinkingStartData, ToolCallData, ToolResultData,
};
use crate::llm_runtime::SharedLlmRuntime;
use crate::tools::sanitize::redact_secrets;
use crate::traits::{ProviderResponse, TokenUsageStore, Tool, ToolCapabilities};
use crate::utils::truncate_str;
pub struct DiagnoseTool {
event_store: Arc<EventStore>,
#[allow(dead_code)]
state: Arc<dyn TokenUsageStore>,
llm_runtime: SharedLlmRuntime,
max_events: usize,
include_raw_args: bool,
}
const ROUTE_ALERT_LOOKBACK_DAYS: i64 = 7;
const ROUTE_ALERT_RECENT_WINDOW: usize = 30;
const ROUTE_ALERT_MIN_BASELINE_WINDOW: usize = 30;
const ROUTE_ALERT_TOOLS_REQUIRED_RATE_DELTA_THRESHOLD: f64 = 0.25;
const ROUTE_ALERT_CLARIFICATION_SUSTAINED_WINDOW: usize = 12;
const ROUTE_ALERT_CLARIFICATION_MIN_BASELINE: usize = 20;
const ROUTE_ALERT_CLARIFICATION_RATE_DELTA_THRESHOLD: f64 = 0.20;
const ROUTE_ALERT_CLARIFICATION_SPIKE_MULTIPLIER: f64 = 2.0;
const ROUTE_ALERT_CLARIFICATION_MIN_SUSTAINED_RATE: f64 = 0.50;
const CONTEXT_SCOPE_ALERT_THRESHOLD: u64 = 1;
#[derive(Debug, Clone)]
struct IntentGateRouteSample {
created_at: chrono::DateTime<Utc>,
route_reason: String,
route_action: String,
route_reply_len: Option<usize>,
}
#[derive(Debug, Default, Clone)]
struct ExecutionReplaySummary {
plan_revisions: Vec<String>,
evidence_gate_triggers: Vec<String>,
validation_failures: Vec<String>,
retry_reasons: Vec<String>,
plan_failure_evidence: Vec<crate::events::EvidenceRef>,
evidence_failure_evidence: Vec<crate::events::EvidenceRef>,
validation_failure_evidence: Vec<crate::events::EvidenceRef>,
}
impl DiagnoseTool {
pub fn new(
event_store: Arc<EventStore>,
state: Arc<dyn TokenUsageStore>,
llm_runtime: SharedLlmRuntime,
max_events: usize,
include_raw_args: bool,
) -> Self {
Self {
event_store,
state,
llm_runtime,
max_events: max_events.clamp(50, 1000),
include_raw_args,
}
}
async fn list_tasks(
&self,
session_id: &str,
limit: usize,
failures_only: bool,
) -> anyhow::Result<String> {
let rows = self
.event_store
.query_recent_task_ends(session_id, failures_only, limit.clamp(1, 50))
.await?;
if rows.is_empty() {
return Ok("No matching tasks found in this session.".to_string());
}
let mut out = format!(
"**Recent Tasks**\n\n- Session: {}\n- Count: {}\n- Failures only: {}\n\n",
session_id,
rows.len(),
failures_only
);
for event in rows {
if let Ok(end) = event.parse_data::<TaskEndData>() {
let status = end.status.as_str();
let outcome = end.effective_outcome().as_str();
let err = end
.error
.as_deref()
.map(|s| truncate_str(s, 120))
.unwrap_or_else(|| "-".to_string());
out.push_str(&format!(
"- `{}` status={} outcome={} duration={}s iterations={} tool_calls={} at {}\n error: {}\n",
end.task_id,
status,
outcome,
end.duration_secs,
end.iterations,
end.tool_calls_count,
event.created_at.to_rfc3339(),
redact_secrets(&err)
));
}
}
Ok(out.trim_end().to_string())
}
async fn collect_task_timeline(
&self,
session_id: &str,
task_id: &str,
) -> anyhow::Result<Vec<Event>> {
let mut events = self
.event_store
.query_task_events_for_session(session_id, task_id)
.await?;
let mut decision_points = self
.event_store
.query_decision_points(session_id, task_id)
.await?;
events.append(&mut decision_points);
events.sort_by(|a, b| a.created_at.cmp(&b.created_at).then(a.id.cmp(&b.id)));
let mut seen = HashSet::new();
events.retain(|e| seen.insert(e.id));
if events.len() > self.max_events {
let keep_from = events.len() - self.max_events;
events = events.split_off(keep_from);
}
Ok(events)
}
fn brief_for_event(&self, event: &Event) -> String {
match event.event_type {
EventType::TaskStart => {
if let Ok(data) = event.parse_data::<TaskStartData>() {
let desc = truncate_str(&data.description, 120);
format!("task_start: {}", desc)
} else {
"task_start".to_string()
}
}
EventType::ThinkingStart => {
if let Some(iter) = event.data.get("iteration").and_then(|v| v.as_u64()) {
format!("thinking_start: iteration {}", iter)
} else {
"thinking_start".to_string()
}
}
EventType::ToolCall => {
if let Ok(data) = event.parse_data::<ToolCallData>() {
let detail = if self.include_raw_args {
truncate_str(&data.arguments.to_string(), 160)
} else {
data.summary
.unwrap_or_else(|| truncate_str(&data.arguments.to_string(), 100))
};
format!("tool_call {} -> {}", data.name, detail)
} else {
"tool_call".to_string()
}
}
EventType::ToolResult => {
if let Ok(data) = event.parse_data::<ToolResultData>() {
let status = if data.success { "ok" } else { "err" };
let detail = if let Some(err) = data.error {
truncate_str(&err, 140)
} else {
truncate_str(&data.result, 140)
};
format!(
"tool_result {} [{}] {}ms -> {}",
data.name, status, data.duration_ms, detail
)
} else {
"tool_result".to_string()
}
}
EventType::Error => {
if let Ok(data) = event.parse_data::<ErrorData>() {
format!(
"error {:?}: {}",
data.error_type,
truncate_str(&data.message, 160)
)
} else {
"error".to_string()
}
}
EventType::TaskEnd => {
if let Ok(data) = event.parse_data::<TaskEndData>() {
let detail = data
.error
.as_deref()
.map(|e| truncate_str(e, 120))
.unwrap_or_else(|| "-".to_string());
format!(
"task_end {} outcome={} duration={}s tool_calls={} err={}",
data.status.as_str(),
data.effective_outcome().as_str(),
data.duration_secs,
data.tool_calls_count,
detail
)
} else {
"task_end".to_string()
}
}
EventType::DecisionPoint => {
if let Ok(data) = event.parse_data::<DecisionPointData>() {
let code = data
.code
.unwrap_or_else(|| data.decision_type.as_str().to_string());
format!(
">>> DECISION {:?} [{}:{}] (iter {}) {}",
data.decision_type,
data.severity.as_str(),
code,
data.iteration,
truncate_str(&data.summary, 140)
)
} else {
">>> DECISION".to_string()
}
}
EventType::ApprovalDenied => {
if let Ok(data) = event.parse_data::<ApprovalDeniedData>() {
format!(
"approval_denied command={}",
truncate_str(&data.command, 120)
)
} else {
"approval_denied".to_string()
}
}
_ => event.event_type.as_str().to_string(),
}
}
fn timeline_text(&self, events: &[Event]) -> String {
if events.is_empty() {
return "No events found for this task in the current session.".to_string();
}
let mut out = String::new();
for event in events {
let line = self.brief_for_event(event);
out.push_str(&format!(
"{} [#{} {}] {}\n",
event.created_at.to_rfc3339(),
event.id,
event.event_type.as_str(),
redact_secrets(&line)
));
}
out.trim_end().to_string()
}
async fn timeline(&self, session_id: &str, task_id: &str) -> anyhow::Result<String> {
let events = self.collect_task_timeline(session_id, task_id).await?;
let mut out = format!(
"**Task Timeline**\n\n- Session: {}\n- Task: {}\n- Events: {}\n\n",
session_id,
task_id,
events.len()
);
out.push_str(&self.timeline_text(&events));
Ok(out)
}
fn evidence_from_event(&self, event: &Event) -> crate::events::EvidenceRef {
crate::events::EvidenceRef {
event_id: event.id,
event_type: event.event_type.as_str().to_string(),
timestamp: event.created_at.to_rfc3339(),
summary: redact_secrets(&truncate_str(&self.brief_for_event(event), 180)),
}
}
fn push_unique_line(lines: &mut Vec<String>, line: String) {
if !line.is_empty() && !lines.contains(&line) {
lines.push(line);
}
}
fn metadata_string(metadata: &Value, key: &str) -> Option<String> {
metadata
.get(key)
.and_then(|value| value.as_str())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn pretty_code(code: &str) -> String {
code.replace('_', " ")
}
fn build_execution_replay_summary(&self, events: &[Event]) -> ExecutionReplaySummary {
let mut summary = ExecutionReplaySummary::default();
for event in events {
if event.event_type != EventType::DecisionPoint {
continue;
}
let Ok(data) = event.parse_data::<DecisionPointData>() else {
continue;
};
let evidence = self.evidence_from_event(event);
let metadata = &data.metadata;
let tool = Self::metadata_string(metadata, "tool")
.unwrap_or_else(|| "unknown tool".to_string());
let target = Self::metadata_string(metadata, "target_hint")
.or_else(|| Self::metadata_string(metadata, "target"))
.unwrap_or_else(|| "unspecified target".to_string());
match data.decision_type {
DecisionType::ExecutionPlanningGate => {
let gate_result = Self::metadata_string(metadata, "gate_result")
.or_else(|| Self::metadata_string(metadata, "condition"))
.unwrap_or_else(|| "unknown".to_string());
let reason = Self::metadata_string(metadata, "reason");
let mut line = format!(
"Event #{}: {} for {} targeting {}.",
event.id,
Self::pretty_code(&gate_result),
tool,
target
);
if let Some(reason) = reason {
line.push_str(&format!(" Reason: {}.", reason));
}
Self::push_unique_line(&mut summary.plan_revisions, line);
if matches!(
gate_result.as_str(),
"plan_rejected" | "rejected" | "plan_unavailable" | "unavailable"
) {
summary.plan_failure_evidence.push(evidence.clone());
}
}
DecisionType::ExecutionCritiquePass => {
let critique_result = Self::metadata_string(metadata, "critique_result")
.or_else(|| Self::metadata_string(metadata, "condition"))
.unwrap_or_else(|| "unknown".to_string());
let detail = Self::metadata_string(metadata, "summary")
.or_else(|| Self::metadata_string(metadata, "reason"));
let mut line = format!(
"Event #{}: {} for {} targeting {}.",
event.id,
Self::pretty_code(&critique_result),
tool,
target
);
if let Some(detail) = detail {
line.push_str(&format!(" Detail: {}.", detail));
}
Self::push_unique_line(&mut summary.plan_revisions, line);
if matches!(
critique_result.as_str(),
"critique_rejected"
| "rejected"
| "critique_invalid"
| "invalid"
| "critique_unavailable"
| "unavailable"
) {
summary.plan_failure_evidence.push(evidence.clone());
}
}
DecisionType::EvidenceGate => {
let evidence_kind = Self::metadata_string(metadata, "required_evidence_kind")
.unwrap_or_else(|| "required evidence".to_string());
let line = format!(
"Event #{}: blocked {} until {} existed for {}.",
event.id, tool, evidence_kind, target
);
Self::push_unique_line(&mut summary.evidence_gate_triggers, line);
summary.evidence_failure_evidence.push(evidence.clone());
}
DecisionType::PostExecutionValidation => {
let outcome = Self::metadata_string(metadata, "outcome")
.unwrap_or_else(|| "validation_blocked".to_string());
let reason = Self::metadata_string(metadata, "reason")
.or_else(|| Self::metadata_string(metadata, "condition"))
.unwrap_or_else(|| "validation_blocked".to_string());
let line = format!(
"Event #{}: {} because {}.",
event.id,
Self::pretty_code(&outcome),
Self::pretty_code(&reason)
);
Self::push_unique_line(&mut summary.validation_failures, line);
summary.validation_failure_evidence.push(evidence.clone());
}
DecisionType::ExecutionFailureClassification => {
let condition = Self::metadata_string(metadata, "condition")
.unwrap_or_else(|| "execution_failure".to_string());
if matches!(
condition.as_str(),
"target_scope_violation" | "tool_contract_violation"
) {
let line = format!(
"Event #{}: {} on {}.",
event.id,
Self::pretty_code(&condition),
tool
);
Self::push_unique_line(&mut summary.validation_failures, line);
summary.plan_failure_evidence.push(evidence.clone());
}
}
DecisionType::ExecutionStateSnapshot => {
if let Some(condition) = Self::metadata_string(metadata, "condition") {
if condition == "execution_budget_exhausted" {
let limit = Self::metadata_string(metadata, "budget_limit")
.unwrap_or_else(|| "execution budget".to_string());
let line = format!(
"Event #{}: execution budget exhausted on {}.",
event.id,
Self::pretty_code(&limit)
);
Self::push_unique_line(&mut summary.validation_failures, line);
summary.validation_failure_evidence.push(evidence.clone());
}
}
}
_ => {}
}
if let Some(reason) = Self::metadata_string(metadata, "loop_repetition_reason") {
Self::push_unique_line(
&mut summary.retry_reasons,
format!(
"Event #{}: loop repeated because {}.",
event.id,
Self::pretty_code(&reason)
),
);
}
}
summary
}
fn format_execution_replay_section(&self, summary: &ExecutionReplaySummary) -> String {
let mut out = String::from("### Execution Replay Summary\n");
let render_group = |out: &mut String, title: &str, items: &[String]| {
out.push_str(&format!("- {}:\n", title));
if items.is_empty() {
out.push_str(" - none\n");
} else {
for item in items.iter().take(5) {
out.push_str(&format!(" - {}\n", item));
}
}
};
render_group(&mut out, "Plan revisions", &summary.plan_revisions);
render_group(
&mut out,
"Evidence gate triggers",
&summary.evidence_gate_triggers,
);
render_group(
&mut out,
"Validation failures",
&summary.validation_failures,
);
render_group(&mut out, "Retry reasons", &summary.retry_reasons);
out.trim_end().to_string()
}
fn build_deterministic_analysis(
&self,
events: &[Event],
replay_summary: &ExecutionReplaySummary,
) -> DeterministicAnalysis {
let mut first_error: Option<crate::events::EvidenceRef> = None;
let mut task_end_failed: Option<(TaskEndData, crate::events::EvidenceRef)> = None;
let mut approval_denied = Vec::new();
let mut tool_failures = Vec::new();
let mut provider_errors = Vec::new();
let mut missing_context = Vec::new();
let mut loop_signals = Vec::new();
for event in events {
let ev = self.evidence_from_event(event);
match event.event_type {
EventType::Error => {
if first_error.is_none() {
first_error = Some(ev.clone());
}
if let Ok(data) = event.parse_data::<ErrorData>() {
let lower = data.message.to_ascii_lowercase();
if matches!(
data.error_type,
ErrorType::LlmError | ErrorType::Timeout | ErrorType::RateLimit
) || lower.contains("rate limit")
|| lower.contains("provider")
|| lower.contains("429")
{
provider_errors.push(ev.clone());
}
if lower.contains("no such file")
|| lower.contains("not found")
|| lower.contains("missing")
{
missing_context.push(ev.clone());
}
}
}
EventType::ToolResult => {
if let Ok(data) = event.parse_data::<ToolResultData>() {
if !data.success {
if first_error.is_none() {
first_error = Some(ev.clone());
}
tool_failures.push(ev);
}
}
}
EventType::ApprovalDenied => approval_denied.push(ev),
EventType::DecisionPoint => {
if let Ok(data) = event.parse_data::<DecisionPointData>() {
if matches!(
data.decision_type,
DecisionType::RepetitiveCallDetection
| DecisionType::ConsecutiveSameToolDetection
| DecisionType::AlternatingPatternDetection
) || matches!(data.decision_type, DecisionType::StoppingCondition)
&& data.summary.to_ascii_lowercase().contains("stall")
{
loop_signals.push(ev);
}
}
}
EventType::TaskEnd => {
if let Ok(data) = event.parse_data::<TaskEndData>() {
if data.effective_outcome() == crate::events::TaskOutcome::Failed {
if data
.error
.as_deref()
.is_some_and(|e| e.to_ascii_lowercase().contains("stall"))
{
loop_signals.push(ev.clone());
}
task_end_failed = Some((data, ev));
}
}
}
_ => {}
}
}
let what_failed = if let Some((failed, _)) = &task_end_failed {
failed
.error
.clone()
.or_else(|| failed.summary.clone())
.unwrap_or_else(|| "Task ended as failed".to_string())
} else if let Some(err) = &first_error {
err.summary.clone()
} else {
"Task did not complete as expected.".to_string()
};
let mut candidates: Vec<RootCauseCandidate> = Vec::new();
if !approval_denied.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::SandboxPermissionBlock,
confidence: 0.9,
description: "Execution required approval and the command was denied or timed out."
.to_string(),
evidence: approval_denied.iter().take(3).cloned().collect(),
why_previous_step_looked_valid: Some(
"The agent selected an executable path, but approval policy blocked execution."
.to_string(),
),
});
}
if !replay_summary.plan_failure_evidence.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::BadAssumption,
confidence: 0.84,
description:
"Planning/scope signals show the selected target or first step was invalid before safe execution could continue."
.to_string(),
evidence: replay_summary
.plan_failure_evidence
.iter()
.take(3)
.cloned()
.collect(),
why_previous_step_looked_valid: Some(
"The step looked executable at first, but structured planning or scope checks showed it targeted the wrong action or target."
.to_string(),
),
});
}
if !replay_summary.evidence_failure_evidence.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::MissingContext,
confidence: 0.85,
description:
"Execution was blocked because the agent did not have direct evidence for the target state yet."
.to_string(),
evidence: replay_summary
.evidence_failure_evidence
.iter()
.take(3)
.cloned()
.collect(),
why_previous_step_looked_valid: Some(
"The intended action made sense, but required proof about the current target state was still missing."
.to_string(),
),
});
}
if !replay_summary.validation_failure_evidence.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::PartialCompletionRegression,
confidence: 0.83,
description:
"Execution made progress, but validation blocked completion or forced scope reduction before success could be claimed."
.to_string(),
evidence: replay_summary
.validation_failure_evidence
.iter()
.take(3)
.cloned()
.collect(),
why_previous_step_looked_valid: Some(
"The last execution step looked productive, but the result still lacked the verification needed for a safe success claim."
.to_string(),
),
});
}
if !loop_signals.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::AgentLoop,
confidence: 0.86,
description:
"Loop guards/stall signals show repeated behavior without meaningful progress."
.to_string(),
evidence: loop_signals.iter().take(3).cloned().collect(),
why_previous_step_looked_valid: Some(
"Each step appeared actionable in isolation, but repeated patterns prevented net progress."
.to_string(),
),
});
}
if !provider_errors.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::ProviderError,
confidence: 0.82,
description: "LLM/provider-side failure signals were detected.".to_string(),
evidence: provider_errors.iter().take(3).cloned().collect(),
why_previous_step_looked_valid: None,
});
}
if !tool_failures.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::ToolFailure,
confidence: 0.76,
description: "One or more tool executions failed and cascaded into task failure."
.to_string(),
evidence: tool_failures.iter().take(3).cloned().collect(),
why_previous_step_looked_valid: None,
});
}
if !missing_context.is_empty() {
candidates.push(RootCauseCandidate {
category: FailureCategory::MissingContext,
confidence: 0.72,
description:
"Execution referenced resources that appear missing or unresolved in context."
.to_string(),
evidence: missing_context.iter().take(3).cloned().collect(),
why_previous_step_looked_valid: None,
});
}
if candidates.is_empty() {
let fallback_evidence = events
.last()
.map(|e| vec![self.evidence_from_event(e)])
.unwrap_or_default();
candidates.push(RootCauseCandidate {
category: FailureCategory::IncorrectResult,
confidence: 0.5,
description:
"No strong deterministic failure signature was found; likely output quality issue."
.to_string(),
evidence: fallback_evidence,
why_previous_step_looked_valid: None,
});
}
candidates.sort_by(|a, b| b.confidence.total_cmp(&a.confidence));
let top = candidates.first().map(|c| c.category);
let (minimal_fix, verification_steps) = match top {
Some(FailureCategory::SandboxPermissionBlock) => (
vec![
"Re-run with explicit approval for the blocked command.".to_string(),
"If this is expected recurring behavior, grant session/persistent approval as appropriate."
.to_string(),
],
vec![
"Confirm an `approval_granted` event appears for the command.".to_string(),
"Confirm subsequent `tool_result` is successful.".to_string(),
],
),
Some(FailureCategory::AgentLoop) => (
vec![
"Add a stricter stop condition or vary tool strategy after first repeated failure."
.to_string(),
"Inject a clarifying question earlier when progress is ambiguous.".to_string(),
],
vec![
"Verify no repetitive/consecutive/alternating decision-point events fire on retry."
.to_string(),
"Verify task ends with `completed` instead of stall/failure.".to_string(),
],
),
Some(FailureCategory::BadAssumption) => (
vec![
"Reconfirm the exact target, repo, or first risky step before executing again."
.to_string(),
"Prefer the target/plan candidate that is backed by direct evidence instead of inference."
.to_string(),
],
vec![
"Confirm a new planning or scope-check decision point accepts the revised target."
.to_string(),
"Confirm the next execution attempt stays within the corrected target scope."
.to_string(),
],
),
Some(FailureCategory::ProviderError) => (
vec![
"Retry with fallback model/provider and backoff.".to_string(),
"Check provider auth/rate-limit status.".to_string(),
],
vec![
"No new provider/LLM error events in rerun.".to_string(),
"Task reaches successful completion.".to_string(),
],
),
Some(FailureCategory::ToolFailure) => (
vec![
"Fix tool arguments/environment based on the first failing tool result."
.to_string(),
"Avoid retrying unchanged failing tool calls repeatedly.".to_string(),
],
vec![
"First previously failing tool call now returns success.".to_string(),
"Task completes without downstream failures.".to_string(),
],
),
Some(FailureCategory::PartialCompletionRegression) => (
vec![
"Run the missing verification step or reduce scope before claiming success."
.to_string(),
"If verification cannot run in the current phase, surface a specific partial result instead of forcing completion."
.to_string(),
],
vec![
"Confirm a post-execution validation event records a verified completion instead of `verification_pending`."
.to_string(),
"Confirm the final reply matches the validated scope and outcome."
.to_string(),
],
),
_ => (
vec!["Apply a minimal correction and rerun from the first divergence point."
.to_string()],
vec!["Confirm final output matches expected behavior.".to_string()],
),
};
DeterministicAnalysis {
what_failed: redact_secrets(&what_failed),
candidates,
why_previous_step_looked_valid: None,
minimal_fix,
verification_steps,
}
}
async fn llm_rank_candidates(
&self,
task_id: &str,
timeline_text: &str,
deterministic: &DeterministicAnalysis,
) -> Option<LlmDiagnosisResponse> {
let allowed_categories: Vec<String> = deterministic
.candidates
.iter()
.map(|c| {
serde_json::to_string(&c.category)
.unwrap_or_else(|_| "\"incorrect_result\"".to_string())
})
.map(|s| s.trim_matches('"').to_string())
.collect();
let deterministic_json = serde_json::to_string_pretty(&deterministic).ok()?;
let system =
"You are a strict diagnostics ranker. Ground every claim in provided event IDs. \
Return JSON only, no markdown.";
let user = format!(
"Task: {task_id}\n\
Allowed categories (must choose only from these): {allowed}\n\n\
Deterministic baseline:\n{baseline}\n\n\
Timeline:\n{timeline}\n\n\
Return JSON object:\n\
{{\n\
\"what_failed\": \"...\",\n\
\"candidates\": [{{\"category\":\"...\",\"confidence\":0.0,\"description\":\"...\",\"evidence_event_ids\":[1,2],\"why_previous_step_looked_valid\":\"...\"}}],\n\
\"why_previous_step_looked_valid\": \"...\",\n\
\"minimal_fix\": [\"...\"],\n\
\"verification_steps\": [\"...\"]\n\
}}\n\
Rules: no invented event IDs; confidence 0..1.",
allowed = allowed_categories.join(", "),
baseline = deterministic_json,
timeline = truncate_str(timeline_text, 12000)
);
let messages = vec![
json!({"role":"system","content":system}),
json!({"role":"user","content":user}),
];
let runtime_snapshot = self.llm_runtime.snapshot();
let fast_model = runtime_snapshot.fast_model();
let call_start = std::time::Instant::now();
let resp: ProviderResponse = match runtime_snapshot
.provider()
.chat(&fast_model, &messages, &[])
.await
{
Ok(r) => r,
Err(_) => return None,
};
crate::events::record_background_model_call_telemetry(
self.event_store.clone(),
self.state.as_ref(),
"background:diagnose",
"diagnose",
&fast_model,
&resp,
call_start.elapsed(),
)
.await;
let raw = resp.content.or(resp.thinking)?;
let json_text = extract_json_object(&raw)?;
serde_json::from_str::<LlmDiagnosisResponse>(&json_text).ok()
}
fn merge_llm_analysis(
&self,
mut deterministic: DeterministicAnalysis,
llm: Option<LlmDiagnosisResponse>,
events: &[Event],
) -> DeterministicAnalysis {
let Some(llm) = llm else {
return deterministic;
};
let mut evidence_by_id: HashMap<i64, crate::events::EvidenceRef> = HashMap::new();
for event in events {
evidence_by_id.insert(event.id, self.evidence_from_event(event));
}
let mut by_category: HashMap<FailureCategory, usize> = HashMap::new();
for (idx, c) in deterministic.candidates.iter().enumerate() {
by_category.insert(c.category, idx);
}
if let Some(cands) = llm.candidates {
for cand in cands {
if let Some(idx) = by_category.get(&cand.category).copied() {
let existing = &mut deterministic.candidates[idx];
let llm_conf = cand.confidence.clamp(0.0, 1.0);
existing.confidence =
(existing.confidence * 0.6 + llm_conf * 0.4).clamp(0.0, 1.0);
if !cand.description.trim().is_empty() {
existing.description = redact_secrets(&cand.description);
}
let mut mapped = Vec::new();
for id in cand.evidence_event_ids {
if let Some(ev) = evidence_by_id.get(&id) {
mapped.push(ev.clone());
}
}
if !mapped.is_empty() {
existing.evidence = mapped;
}
if let Some(why) = cand.why_previous_step_looked_valid {
if !why.trim().is_empty() {
existing.why_previous_step_looked_valid =
Some(redact_secrets(&truncate_str(&why, 220)));
}
}
}
}
}
deterministic
.candidates
.sort_by(|a, b| b.confidence.total_cmp(&a.confidence));
if let Some(what_failed) = llm.what_failed {
if !what_failed.trim().is_empty() {
deterministic.what_failed = redact_secrets(&truncate_str(&what_failed, 300));
}
}
if let Some(why) = llm.why_previous_step_looked_valid {
if !why.trim().is_empty() {
deterministic.why_previous_step_looked_valid =
Some(redact_secrets(&truncate_str(&why, 260)));
}
}
if let Some(fixes) = llm.minimal_fix {
let filtered: Vec<String> = fixes
.into_iter()
.filter(|s| !s.trim().is_empty())
.map(|s| redact_secrets(&truncate_str(&s, 200)))
.collect();
if !filtered.is_empty() {
deterministic.minimal_fix = filtered;
}
}
if let Some(steps) = llm.verification_steps {
let filtered: Vec<String> = steps
.into_iter()
.filter(|s| !s.trim().is_empty())
.map(|s| redact_secrets(&truncate_str(&s, 200)))
.collect();
if !filtered.is_empty() {
deterministic.verification_steps = filtered;
}
}
deterministic
}
fn parse_resumed_task_id_from_error(&self, text: &str) -> Option<String> {
const MARKER: &str = "Resumed in task ";
let idx = text.find(MARKER)?;
let tail = text[idx + MARKER.len()..].trim();
let first_token = tail
.trim_start_matches('`')
.split_whitespace()
.next()
.unwrap_or("")
.trim_end_matches('.')
.trim_end_matches('`')
.trim();
if first_token.is_empty() {
None
} else {
Some(first_token.to_string())
}
}
fn checkpoint_snapshot_from_events(
&self,
task_id: &str,
events: &[Event],
) -> ResumeCheckpointSnapshot {
let mut description: Option<String> = None;
let mut original_user_message: Option<String> = None;
let mut parent_task_id: Option<String> = None;
let mut resumed_into_task_id: Option<String> = None;
let mut duration_secs: Option<u64> = None;
let mut last_iteration: u32 = 0;
let mut tool_results_count: u32 = 0;
let mut pending_tool_calls: HashSet<String> = HashSet::new();
let mut last_assistant_summary: Option<String> = None;
let mut last_tool_summary: Option<String> = None;
let mut last_error: Option<String> = None;
for event in events {
match event.event_type {
EventType::TaskStart => {
if let Ok(data) = event.parse_data::<TaskStartData>() {
if description.is_none() {
description = Some(data.description);
}
if original_user_message.is_none() {
original_user_message = data.user_message;
}
if parent_task_id.is_none() {
parent_task_id = data.parent_task_id;
}
}
}
EventType::ThinkingStart => {
if let Ok(data) = event.parse_data::<ThinkingStartData>() {
last_iteration = last_iteration.max(data.iteration);
}
}
EventType::AssistantResponse => {
if let Ok(data) = event.parse_data::<AssistantResponseData>() {
if let Some(calls) = data.tool_calls {
for call in calls {
pending_tool_calls.insert(call.id);
}
}
if let Some(content) = data.content {
let trimmed = content.trim();
if !trimmed.is_empty() {
last_assistant_summary = Some(truncate_str(trimmed, 180));
}
}
}
}
EventType::ToolResult => {
if let Ok(data) = event.parse_data::<ToolResultData>() {
tool_results_count = tool_results_count.saturating_add(1);
pending_tool_calls.remove(&data.tool_call_id);
let detail = data.error.unwrap_or(data.result);
let trimmed = detail.trim();
if !trimmed.is_empty() {
last_tool_summary = Some(truncate_str(trimmed, 180));
}
}
}
EventType::Error => {
if let Ok(data) = event.parse_data::<ErrorData>() {
let trimmed = data.message.trim();
if !trimmed.is_empty() {
last_error = Some(truncate_str(trimmed, 180));
}
}
}
EventType::TaskEnd => {
if let Ok(data) = event.parse_data::<TaskEndData>() {
duration_secs = Some(data.duration_secs);
if let Some(err) = data.error {
let trimmed = err.trim();
if !trimmed.is_empty() {
last_error = Some(truncate_str(trimmed, 180));
resumed_into_task_id =
self.parse_resumed_task_id_from_error(trimmed);
}
}
}
}
_ => {}
}
}
let mut pending_tool_call_ids: Vec<String> = pending_tool_calls.into_iter().collect();
pending_tool_call_ids.sort();
ResumeCheckpointSnapshot {
task_id: task_id.to_string(),
description,
original_user_message,
parent_task_id,
resumed_into_task_id,
duration_secs,
last_iteration,
tool_results_count,
pending_tool_call_ids,
last_assistant_summary,
last_tool_summary,
last_error,
}
}
async fn build_resume_recovery_section(
&self,
session_id: &str,
task_id: &str,
task_events: &[Event],
) -> String {
let current = self.checkpoint_snapshot_from_events(task_id, task_events);
let mut lines = vec!["### Resume Recovery State".to_string()];
lines.push(format!(
"- Current task: `{}` (iteration {}, tool_results {}, pending_tool_calls {})",
current.task_id,
current.last_iteration,
current.tool_results_count,
current.pending_tool_call_ids.len()
));
if let Some(desc) = current.description.as_ref() {
lines.push(format!("- Current description: {}", redact_secrets(desc)));
}
if let Some(parent) = current.parent_task_id.as_ref() {
lines.push(format!("- Resumed from previous task: `{}`", parent));
let parent_events = self
.event_store
.query_task_events_for_session(session_id, parent)
.await
.unwrap_or_default();
if !parent_events.is_empty() {
let parent_snapshot = self.checkpoint_snapshot_from_events(parent, &parent_events);
if let Some(duration) = parent_snapshot.duration_secs {
lines.push(format!(
"- Previous elapsed before interruption: {}s",
duration
));
}
lines.push(format!(
"- Previous checkpoint: iteration {}, tool_results {}, pending_tool_calls {}",
parent_snapshot.last_iteration,
parent_snapshot.tool_results_count,
parent_snapshot.pending_tool_call_ids.len()
));
if let Some(desc) = parent_snapshot.description.as_ref() {
lines.push(format!("- Previous description: {}", redact_secrets(desc)));
}
if let Some(msg) = parent_snapshot.original_user_message.as_ref() {
lines.push(format!(
"- Previous user request: {}",
redact_secrets(&truncate_str(msg, 180))
));
}
if let Some(summary) = parent_snapshot.last_assistant_summary.as_ref() {
lines.push(format!(
"- Previous last assistant output: {}",
redact_secrets(summary)
));
}
if let Some(summary) = parent_snapshot.last_tool_summary.as_ref() {
lines.push(format!(
"- Previous last tool result: {}",
redact_secrets(summary)
));
}
if let Some(err) = parent_snapshot.last_error.as_ref() {
lines.push(format!(
"- Previous interruption/error: {}",
redact_secrets(err)
));
}
if let Some(next_task) = parent_snapshot.resumed_into_task_id.as_ref() {
lines.push(format!("- Previous task resumed into: `{}`", next_task));
}
} else {
lines.push("- Previous task events not found in this session.".to_string());
}
} else if let Some(next_task) = current.resumed_into_task_id.as_ref() {
lines.push(format!(
"- This task was interrupted and resumed into: `{}`",
next_task
));
if let Some(duration) = current.duration_secs {
lines.push(format!("- Elapsed before interruption: {}s", duration));
}
} else {
lines.push("- No resume linkage detected for this task.".to_string());
}
if let Some(err) = current.last_error.as_ref() {
lines.push(format!("- Current last error: {}", redact_secrets(err)));
}
if let Some(summary) = current.last_assistant_summary.as_ref() {
lines.push(format!(
"- Current last assistant output: {}",
redact_secrets(summary)
));
}
if let Some(summary) = current.last_tool_summary.as_ref() {
lines.push(format!(
"- Current last tool result: {}",
redact_secrets(summary)
));
}
lines.join("\n")
}
#[allow(clippy::too_many_arguments)]
fn format_diagnosis(
&self,
task_id: &str,
analysis: &DeterministicAnalysis,
execution_replay_section: &str,
route_health_alerts_section: &str,
route_reason_trends_section: &str,
context_scope_guards_section: &str,
recovery_section: &str,
) -> String {
let mut out = format!(
"## Diagnosis for Task {}\n\n### What Failed\n{}\n\n### Most Likely Cause(s)\n",
task_id, analysis.what_failed
);
for (idx, candidate) in analysis.candidates.iter().enumerate() {
out.push_str(&format!(
"{}. **{:?}** (confidence: {:.0}%)\n{}\n",
idx + 1,
candidate.category,
candidate.confidence * 100.0,
candidate.description
));
if candidate.evidence.is_empty() {
out.push_str("Evidence: (none linked)\n");
} else {
let refs: Vec<String> = candidate
.evidence
.iter()
.map(|e| {
format!(
"Event #{} ({}): {}",
e.event_id,
e.timestamp,
truncate_str(&e.summary, 140)
)
})
.collect();
out.push_str(&format!("Evidence: {}\n", refs.join(" | ")));
}
}
out.push_str("\n### Why Previous Step Looked Valid\n");
if let Some(why) = analysis
.why_previous_step_looked_valid
.as_ref()
.or_else(|| {
analysis
.candidates
.first()?
.why_previous_step_looked_valid
.as_ref()
})
{
out.push_str(why);
} else {
out.push_str("No explicit misleading-validity signal detected.");
}
out.push_str("\n\n");
out.push_str(execution_replay_section);
out.push_str("\n\n");
out.push_str(route_health_alerts_section);
out.push_str("\n\n");
out.push_str(route_reason_trends_section);
out.push_str("\n\n");
out.push_str(context_scope_guards_section);
out.push_str("\n\n### Minimal Fix\n");
for step in &analysis.minimal_fix {
out.push_str(&format!("- {}\n", step));
}
out.push_str("\n### Verification Steps\n");
for step in &analysis.verification_steps {
out.push_str(&format!("- {}\n", step));
}
out.push('\n');
out.push_str(recovery_section);
out.trim_end().to_string()
}
fn build_context_scope_guards_section(&self) -> String {
let metrics = crate::agent::policy_metrics_snapshot();
let mut out = String::from("### Context/Scope Guardrails\n");
let total_guard_interventions = metrics.context_bleed_prevented_total
+ metrics.context_mismatch_preflight_drop_total
+ metrics.followup_mode_overrides_total
+ metrics.cross_scope_blocked_total;
out.push_str(&format!(
"- context_bleed_prevented_total: {}\n",
metrics.context_bleed_prevented_total
));
out.push_str(&format!(
"- context_mismatch_preflight_drop_total: {}\n",
metrics.context_mismatch_preflight_drop_total
));
out.push_str(&format!(
"- followup_mode_overrides_total: {}\n",
metrics.followup_mode_overrides_total
));
out.push_str(&format!(
"- cross_scope_blocked_total: {}\n",
metrics.cross_scope_blocked_total
));
out.push_str(&format!(
"- route_drift_alert_total: {}\n",
metrics.route_drift_alert_total
));
out.push_str(&format!(
"- route_drift_failsafe_activation_total: {}\n",
metrics.route_drift_failsafe_activation_total
));
out.push_str(&format!(
"- route_failsafe_active_turn_total: {}\n",
metrics.route_failsafe_active_turn_total
));
if total_guard_interventions >= CONTEXT_SCOPE_ALERT_THRESHOLD {
out.push_str(&format!(
"- Status: alert ({} guard intervention(s) observed).\n",
total_guard_interventions
));
} else {
out.push_str("- Status: healthy (no context/scope guard interventions observed).\n");
}
out.trim_end().to_string()
}
fn intent_gate_route_sample_from_event(event: &Event) -> Option<IntentGateRouteSample> {
let Ok(dp) = event.parse_data::<DecisionPointData>() else {
return None;
};
if dp.decision_type != DecisionType::IntentGate {
return None;
}
let route_reason = dp
.metadata
.get("route_reason")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let route_action = dp
.metadata
.get("route_action")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let route_reply_len = dp
.metadata
.get("route_reply_len")
.and_then(|v| v.as_u64())
.and_then(|v| usize::try_from(v).ok());
Some(IntentGateRouteSample {
created_at: event.created_at,
route_reason,
route_action,
route_reply_len,
})
}
fn route_reason_rate(samples: &[IntentGateRouteSample], reason: &str) -> f64 {
if samples.is_empty() {
return 0.0;
}
let hits = samples
.iter()
.filter(|sample| sample.route_reason == reason)
.count();
hits as f64 / samples.len() as f64
}
async fn build_route_health_alerts_section(&self, session_id: &str) -> String {
let since = Utc::now() - Duration::days(ROUTE_ALERT_LOOKBACK_DAYS);
let recent_events = self
.event_store
.query_events(session_id, since)
.await
.unwrap_or_default();
let mut samples: Vec<IntentGateRouteSample> = recent_events
.iter()
.filter_map(Self::intent_gate_route_sample_from_event)
.collect();
samples.sort_by_key(|sample| std::cmp::Reverse(sample.created_at));
let mut out = String::from("### Route Health Alerts\n");
if samples.is_empty() {
out.push_str("- No intent-gate decisions found in the lookback window.\n");
return out;
}
let recent_len = samples.len().min(ROUTE_ALERT_RECENT_WINDOW);
let recent = &samples[..recent_len];
let baseline = samples.get(recent_len..).unwrap_or(&[]);
let mut alerts: Vec<String> = Vec::new();
let mut notes: Vec<String> = Vec::new();
let empty_direct_replies: Vec<&IntentGateRouteSample> = samples
.iter()
.filter(|sample| sample.route_action == "return" && sample.route_reply_len == Some(0))
.collect();
if let Some(latest) = empty_direct_replies.first() {
alerts.push(format!(
"Empty direct reply detected {} time(s) in the last {} days (latest at {}).",
empty_direct_replies.len(),
ROUTE_ALERT_LOOKBACK_DAYS,
latest.created_at.to_rfc3339(),
));
}
if recent.len() >= ROUTE_ALERT_RECENT_WINDOW
&& baseline.len() >= ROUTE_ALERT_MIN_BASELINE_WINDOW
{
let recent_tools_rate = Self::route_reason_rate(recent, "tools_required");
let baseline_tools_rate = Self::route_reason_rate(baseline, "tools_required");
let delta = recent_tools_rate - baseline_tools_rate;
if delta.abs() >= ROUTE_ALERT_TOOLS_REQUIRED_RATE_DELTA_THRESHOLD {
alerts.push(format!(
"`tools_required` rate drifted from {:.0}% (baseline, {} turns) to {:.0}% (recent, {} turns), delta {:+.0}pp.",
baseline_tools_rate * 100.0,
baseline.len(),
recent_tools_rate * 100.0,
recent.len(),
delta * 100.0,
));
}
} else {
notes.push(format!(
"Need at least {} recent and {} baseline turns for `tools_required` drift checks (have recent={}, baseline={}).",
ROUTE_ALERT_RECENT_WINDOW,
ROUTE_ALERT_MIN_BASELINE_WINDOW,
recent.len(),
baseline.len(),
));
}
let sustained = samples
.get(0..ROUTE_ALERT_CLARIFICATION_SUSTAINED_WINDOW)
.unwrap_or(&[]);
let clarification_baseline = if baseline.len() >= ROUTE_ALERT_CLARIFICATION_MIN_BASELINE {
baseline
} else {
samples
.get(ROUTE_ALERT_CLARIFICATION_SUSTAINED_WINDOW..)
.unwrap_or(&[])
};
if sustained.len() == ROUTE_ALERT_CLARIFICATION_SUSTAINED_WINDOW
&& clarification_baseline.len() >= ROUTE_ALERT_CLARIFICATION_MIN_BASELINE
{
let sustained_rate = Self::route_reason_rate(sustained, "clarification_required");
let baseline_rate =
Self::route_reason_rate(clarification_baseline, "clarification_required");
let relative_spike = if baseline_rate <= f64::EPSILON {
sustained_rate >= ROUTE_ALERT_CLARIFICATION_MIN_SUSTAINED_RATE
} else {
sustained_rate >= baseline_rate * ROUTE_ALERT_CLARIFICATION_SPIKE_MULTIPLIER
};
let absolute_spike =
(sustained_rate - baseline_rate) >= ROUTE_ALERT_CLARIFICATION_RATE_DELTA_THRESHOLD;
if sustained_rate >= ROUTE_ALERT_CLARIFICATION_MIN_SUSTAINED_RATE
&& (relative_spike || absolute_spike)
{
alerts.push(format!(
"`clarification_required` spiked to {:.0}% in the latest {} turns vs {:.0}% baseline.",
sustained_rate * 100.0,
sustained.len(),
baseline_rate * 100.0,
));
}
} else {
notes.push(format!(
"Need {} latest turns and {} baseline turns for clarification spike checks (have latest={}, baseline={}).",
ROUTE_ALERT_CLARIFICATION_SUSTAINED_WINDOW,
ROUTE_ALERT_CLARIFICATION_MIN_BASELINE,
sustained.len(),
clarification_baseline.len(),
));
}
out.push_str(&format!(
"- Lookback: last {} days in session `{}` ({} intent_gate decisions)\n",
ROUTE_ALERT_LOOKBACK_DAYS,
session_id,
samples.len()
));
if alerts.is_empty() {
out.push_str("- Status: no active route-health alerts.\n");
} else {
out.push_str(&format!("- Status: {} active alert(s).\n", alerts.len()));
out.push_str("- Alerts:\n");
for alert in alerts {
out.push_str(&format!(" - {}\n", alert));
}
}
if !notes.is_empty() {
out.push_str("- Notes:\n");
for note in notes {
out.push_str(&format!(" - {}\n", note));
}
}
out.trim_end().to_string()
}
async fn build_route_reason_trends_section(&self, session_id: &str) -> String {
let recent = self
.event_store
.query_recent_intent_gate_decision_points(session_id, 40)
.await
.unwrap_or_default();
let mut reason_counts: HashMap<String, usize> = HashMap::new();
let mut action_counts: HashMap<String, usize> = HashMap::new();
let mut sequence: Vec<String> = Vec::new();
let mut sampled = 0usize;
for event in &recent {
let Some(sample) = Self::intent_gate_route_sample_from_event(event) else {
continue;
};
*reason_counts
.entry(sample.route_reason.clone())
.or_insert(0) += 1;
*action_counts
.entry(sample.route_action.clone())
.or_insert(0) += 1;
if sequence.len() < 6 {
sequence.push(format!(
"{} -> {} ({})",
sample.route_reason,
sample.route_action,
sample.created_at.to_rfc3339()
));
}
sampled += 1;
}
let mut out = String::from("### Recent Route Reason Trends\n");
if sampled == 0 {
out.push_str("- No recent intent-gate route_reason data found.\n");
return out;
}
let mut ranked_reasons: Vec<(String, usize)> = reason_counts.into_iter().collect();
ranked_reasons.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
let mut ranked_actions: Vec<(String, usize)> = action_counts.into_iter().collect();
ranked_actions.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
out.push_str(&format!(
"- Window: last {} intent_gate decisions in session `{}`\n",
sampled, session_id
));
out.push_str("- Action mix: ");
out.push_str(
&ranked_actions
.iter()
.map(|(action, count)| format!("{}={}", action, count))
.collect::<Vec<_>>()
.join(", "),
);
out.push('\n');
out.push_str("- Top route reasons:\n");
for (reason, count) in ranked_reasons.iter().take(5) {
let pct = (*count as f64 / sampled as f64) * 100.0;
out.push_str(&format!(" - {}: {} ({:.0}%)\n", reason, count, pct));
}
out.push_str("- Latest route sequence:\n");
for item in &sequence {
out.push_str(&format!(" - {}\n", item));
}
out.trim_end().to_string()
}
async fn diagnose(&self, session_id: &str, task_id: Option<&str>) -> anyhow::Result<String> {
let resolved_task = if let Some(task_id) = task_id {
task_id.to_string()
} else {
let latest_failed = self
.event_store
.query_recent_task_ends(session_id, true, 1)
.await?;
let Some(event) = latest_failed.first() else {
return Ok(
"No failed task found in this session. Provide task_id or use action=list_tasks."
.to_string(),
);
};
if let Ok(end) = event.parse_data::<TaskEndData>() {
end.task_id
} else {
return Ok("Could not resolve latest failed task ID.".to_string());
}
};
let events = self
.collect_task_timeline(session_id, &resolved_task)
.await?;
if events.is_empty() {
return Ok(format!(
"No events found for task `{}` in session `{}`.",
resolved_task, session_id
));
}
let timeline_text = self.timeline_text(&events);
let replay_summary = self.build_execution_replay_summary(&events);
let deterministic = self.build_deterministic_analysis(&events, &replay_summary);
let llm = self
.llm_rank_candidates(&resolved_task, &timeline_text, &deterministic)
.await;
let merged = self.merge_llm_analysis(deterministic, llm, &events);
let execution_replay_section = self.format_execution_replay_section(&replay_summary);
let route_health_alerts = self.build_route_health_alerts_section(session_id).await;
let route_reason_trends = self.build_route_reason_trends_section(session_id).await;
let context_scope_guards = self.build_context_scope_guards_section();
let recovery_section = self
.build_resume_recovery_section(session_id, &resolved_task, &events)
.await;
let diagnosis = self.format_diagnosis(
&resolved_task,
&merged,
&execution_replay_section,
&route_health_alerts,
&route_reason_trends,
&context_scope_guards,
&recovery_section,
);
let llm_section = self.build_llm_telemetry_section(&resolved_task).await;
let harness_section = events
.iter()
.rev()
.find_map(|event| {
if event.event_type != EventType::TaskEnd {
return None;
}
event
.parse_data::<TaskEndData>()
.ok()
.and_then(|data| data.harness_eval)
})
.map(|eval| crate::harness_eval::report::format_diagnose_harness_section(&eval))
.unwrap_or_default();
Ok(format!("{diagnosis}{harness_section}{llm_section}"))
}
async fn build_llm_telemetry_section(&self, task_id: &str) -> String {
let summary = match self.event_store.get_task_llm_stats(task_id).await {
Ok(s) if s.total_calls > 0 => s,
_ => return String::new(),
};
let mut out = String::from("\n\n## LLM Calls\n\n");
out.push_str(&format!(
"- Calls: {} (attempts: {}{})\n",
summary.total_calls,
summary.total_attempts,
if summary.total_attempts > summary.total_calls {
" — retries occurred"
} else {
""
}
));
out.push_str(&format!(
"- Latency: avg {}ms, p50 {}ms, p95 {}ms, max {}ms (slowest at iteration {})\n",
summary.avg_latency_ms,
summary.p50_latency_ms,
summary.p95_latency_ms,
summary.max_latency_ms,
summary.max_latency_iteration,
));
if let Some(model) = &summary.final_model {
out.push_str(&format!("- Final model: {model}\n"));
}
if summary.fell_back_count > 0 {
out.push_str(&format!(
"- ⚠️ Fallbacks: {} call(s) fell back to a different model/provider\n",
summary.fell_back_count
));
}
out.push_str(&format!(
"- Tokens: {} input, {} output\n",
summary.total_input_tokens, summary.total_output_tokens,
));
if summary.est_samples > 0 {
let drift = summary.est_input_drift();
let pct = if summary.actual_input_tokens_with_est > 0 {
(drift as f64 / summary.actual_input_tokens_with_est as f64) * 100.0
} else {
0.0
};
let direction = if drift > 0 {
"over-estimated"
} else if drift < 0 {
"under-estimated"
} else {
"exact"
};
out.push_str(&format!(
"- Token estimate drift: est {} vs actual {} over {} call(s) — {} by {} ({:+.0}%)\n",
summary.total_est_input_tokens,
summary.actual_input_tokens_with_est,
summary.est_samples,
direction,
drift.abs(),
pct,
));
}
out.push_str(
"\n_Note: latency reflects the model/provider, not agent logic — \
check `final_model` before attributing slowness to the harness._\n",
);
out
}
fn signature_for_event(&self, event: &Event) -> String {
match event.event_type {
EventType::DecisionPoint => {
if let Ok(dp) = event.parse_data::<DecisionPointData>() {
format!(
"decision:{:?}:{}",
dp.decision_type,
truncate_str(&dp.summary, 40)
)
} else {
"decision".to_string()
}
}
EventType::ToolCall => {
if let Ok(tc) = event.parse_data::<ToolCallData>() {
format!("tool_call:{}", tc.name)
} else {
"tool_call".to_string()
}
}
EventType::ToolResult => {
if let Ok(tr) = event.parse_data::<ToolResultData>() {
format!("tool_result:{}:{}", tr.name, tr.success)
} else {
"tool_result".to_string()
}
}
EventType::TaskEnd => {
if let Ok(te) = event.parse_data::<TaskEndData>() {
format!("task_end:{}", te.status.as_str())
} else {
"task_end".to_string()
}
}
_ => event.event_type.as_str().to_string(),
}
}
async fn compare(
&self,
session_id: &str,
task_a: &str,
task_b: &str,
) -> anyhow::Result<String> {
let timeline_a = self.collect_task_timeline(session_id, task_a).await?;
let timeline_b = self.collect_task_timeline(session_id, task_b).await?;
if timeline_a.is_empty() || timeline_b.is_empty() {
return Ok("One or both tasks have no events in this session.".to_string());
}
let sig_a: Vec<String> = timeline_a
.iter()
.map(|e| self.signature_for_event(e))
.collect();
let sig_b: Vec<String> = timeline_b
.iter()
.map(|e| self.signature_for_event(e))
.collect();
let mut divergence_at = None;
for i in 0..sig_a.len().min(sig_b.len()) {
if sig_a[i] != sig_b[i] {
divergence_at = Some(i);
break;
}
}
let idx = divergence_at.unwrap_or(sig_a.len().min(sig_b.len()));
let mut out = format!(
"**Task Comparison**\n\n- Session: {}\n- Task A: {} ({} events)\n- Task B: {} ({} events)\n",
session_id,
task_a,
timeline_a.len(),
task_b,
timeline_b.len()
);
if let Some(i) = divergence_at {
out.push_str(&format!("\nFirst divergence at position {}:\n", i + 1));
out.push_str(&format!(
"- A: {}\n- B: {}\n",
redact_secrets(&self.brief_for_event(&timeline_a[i])),
redact_secrets(&self.brief_for_event(&timeline_b[i]))
));
} else {
out.push_str("\nNo divergence in shared prefix; differences only in tail events.\n");
}
if let Some(a) = timeline_a.get(idx) {
out.push_str(&format!(
"\nNext unique A event: {}",
redact_secrets(&self.brief_for_event(a))
));
}
if let Some(b) = timeline_b.get(idx) {
out.push_str(&format!(
"\nNext unique B event: {}",
redact_secrets(&self.brief_for_event(b))
));
}
Ok(out)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct DeterministicAnalysis {
what_failed: String,
candidates: Vec<RootCauseCandidate>,
#[serde(skip_serializing_if = "Option::is_none")]
why_previous_step_looked_valid: Option<String>,
minimal_fix: Vec<String>,
verification_steps: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct DiagnoseArgs {
action: String,
#[serde(default)]
task_id: Option<String>,
#[serde(default)]
task_id_a: Option<String>,
#[serde(default)]
task_id_b: Option<String>,
#[serde(default)]
limit: Option<usize>,
#[serde(default)]
failures_only: Option<bool>,
#[serde(default)]
_session_id: Option<String>,
}
#[derive(Debug, Deserialize)]
struct LlmDiagnosisResponse {
#[serde(default)]
what_failed: Option<String>,
#[serde(default)]
candidates: Option<Vec<LlmCandidate>>,
#[serde(default)]
why_previous_step_looked_valid: Option<String>,
#[serde(default)]
minimal_fix: Option<Vec<String>>,
#[serde(default)]
verification_steps: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct LlmCandidate {
category: FailureCategory,
confidence: f32,
description: String,
#[serde(default)]
evidence_event_ids: Vec<i64>,
#[serde(default)]
why_previous_step_looked_valid: Option<String>,
}
#[derive(Debug, Clone)]
struct ResumeCheckpointSnapshot {
task_id: String,
description: Option<String>,
original_user_message: Option<String>,
parent_task_id: Option<String>,
resumed_into_task_id: Option<String>,
duration_secs: Option<u64>,
last_iteration: u32,
tool_results_count: u32,
pending_tool_call_ids: Vec<String>,
last_assistant_summary: Option<String>,
last_tool_summary: Option<String>,
last_error: Option<String>,
}
fn extract_json_object(raw: &str) -> Option<String> {
crate::utils::extract_json_object(raw)
}
#[async_trait]
impl Tool for DiagnoseTool {
fn name(&self) -> &str {
"self_diagnose"
}
fn description(&self) -> &str {
"Diagnose why an agent task failed using event timelines, decision points, evidence-linked root-cause ranking, and per-task LLM telemetry (latency, fallbacks, token-estimate drift)."
}
fn schema(&self) -> Value {
json!({
"name": "self_diagnose",
"description": "Diagnose why a PREVIOUS agent task failed or behaved inefficiently. The diagnose action also reports per-task LLM telemetry (latency p50/p95/max, model fallbacks/retries, and est-vs-actual input-token drift) so you can tell whether slowness came from the model/provider vs the harness. ONLY use when the user explicitly asks why something failed/was slow or requests diagnostics. Do NOT call during normal task execution (code generation, file operations, etc.).",
"parameters": {
"type": "object",
"properties": {
"action": {
"type":"string",
"enum":["list_tasks","timeline","diagnose","compare"]
},
"task_id": {
"type":"string",
"description":"Target task ID for timeline/diagnose."
},
"task_id_a": {
"type":"string",
"description":"First task ID for compare."
},
"task_id_b": {
"type":"string",
"description":"Second task ID for compare."
},
"limit": {
"type":"integer",
"description":"List limit (default 10, max 50)."
},
"failures_only": {
"type":"boolean",
"description":"For list_tasks, filter to failed tasks (default true)."
}
},
"required":["action"],
"additionalProperties": false
}
})
}
fn capabilities(&self) -> ToolCapabilities {
ToolCapabilities {
read_only: true,
external_side_effect: false,
needs_approval: false,
idempotent: true,
high_impact_write: false,
}
}
async fn call(&self, arguments: &str) -> anyhow::Result<String> {
let args: DiagnoseArgs = serde_json::from_str(arguments)?;
let session_id = args
._session_id
.as_deref()
.filter(|s| !s.trim().is_empty())
.ok_or_else(|| anyhow::anyhow!("self_diagnose requires internal _session_id"))?;
match args.action.as_str() {
"list_tasks" => {
self.list_tasks(
session_id,
args.limit.unwrap_or(10),
args.failures_only.unwrap_or(true),
)
.await
}
"timeline" => {
let task_id = args
.task_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("task_id is required for action=timeline"))?;
self.timeline(session_id, task_id).await
}
"diagnose" => self.diagnose(session_id, args.task_id.as_deref()).await,
"compare" => {
let a = args
.task_id_a
.as_deref()
.ok_or_else(|| anyhow::anyhow!("task_id_a is required for action=compare"))?;
let b = args
.task_id_b
.as_deref()
.ok_or_else(|| anyhow::anyhow!("task_id_b is required for action=compare"))?;
self.compare(session_id, a, b).await
}
other => Ok(format!(
"Unknown action: '{}'. Use list_tasks, timeline, diagnose, or compare.",
other
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, Utc};
use crate::config::ProviderKind;
use crate::events::{DecisionPointData, TaskStatus};
use crate::llm_runtime::SharedLlmRuntime;
use crate::memory::embeddings::EmbeddingService;
use crate::state::SqliteStateStore;
use crate::traits::{ModelProvider, TokenUsage};
struct MockProvider {
content: String,
}
#[async_trait]
impl ModelProvider for MockProvider {
async fn chat(
&self,
_model: &str,
_messages: &[Value],
_tools: &[Value],
) -> anyhow::Result<ProviderResponse> {
Ok(ProviderResponse {
content: Some(self.content.clone()),
tool_calls: vec![],
usage: Some(TokenUsage {
input_tokens: 1,
output_tokens: 1,
cached_input_tokens: None,
cache_creation_input_tokens: None,
model: "mock".to_string(),
}),
thinking: None,
response_note: None,
})
}
async fn list_models(&self) -> anyhow::Result<Vec<String>> {
Ok(vec!["mock-fast".to_string()])
}
}
async fn setup_tool() -> DiagnoseTool {
let db_file = tempfile::NamedTempFile::new().expect("temp db file");
let db_path = db_file.path().to_string_lossy().to_string();
let embedding_service = Arc::new(EmbeddingService::new().unwrap());
let state = Arc::new(
SqliteStateStore::new(&db_path, 50, None, embedding_service)
.await
.expect("state"),
);
std::mem::forget(db_file);
let event_store = Arc::new(EventStore::new(state.pool()).await.expect("event store"));
let provider: Arc<dyn ModelProvider> = Arc::new(MockProvider {
content: json!({
"what_failed":"Tool execution failed",
"candidates":[
{"category":"tool_failure","confidence":0.8,"description":"Tool failed","evidence_event_ids":[3]}
],
"minimal_fix":["Fix tool arguments"],
"verification_steps":["Re-run and confirm success"]
})
.to_string(),
});
let llm_runtime = SharedLlmRuntime::new(
provider,
None,
ProviderKind::OpenaiCompatible,
"mock-fast".to_string(),
);
DiagnoseTool::new(
event_store,
state as Arc<dyn TokenUsageStore>,
llm_runtime,
200,
false,
)
}
async fn append_event<T: serde::Serialize>(
tool: &DiagnoseTool,
session_id: &str,
event_type: EventType,
data: T,
task_id: Option<&str>,
) {
let mut event = crate::events::Event::new(
session_id.to_string(),
event_type,
serde_json::to_value(data).unwrap(),
);
event.task_id = task_id.map(str::to_string);
event.created_at = Utc::now();
tool.event_store.append(event).await.expect("append");
}
async fn append_event_at<T: serde::Serialize>(
tool: &DiagnoseTool,
session_id: &str,
event_type: EventType,
data: T,
task_id: Option<&str>,
created_at: chrono::DateTime<Utc>,
) {
let mut event = crate::events::Event::new(
session_id.to_string(),
event_type,
serde_json::to_value(data).unwrap(),
);
event.task_id = task_id.map(str::to_string);
event.created_at = created_at;
tool.event_store.append(event).await.expect("append");
}
#[tokio::test]
async fn test_list_tasks() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 3,
iterations: 1,
tool_calls_count: 2,
error: Some("boom".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t1"),
)
.await;
append_event(
&tool,
"s2",
EventType::TaskEnd,
TaskEndData {
task_id: "t2".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 3,
iterations: 1,
tool_calls_count: 2,
error: Some("other".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t2"),
)
.await;
let res = tool
.call(r#"{"action":"list_tasks","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("t1"));
assert!(!res.contains("t2"));
}
#[tokio::test]
async fn test_timeline_reconstruction_and_decision_points() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t1".to_string(),
description: "Test".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::IntentGate,
task_id: "t1".to_string(),
iteration: 1,
severity: crate::events::DiagnosticSeverity::Info,
code: Some("intent_gate".to_string()),
metadata: json!({"needs_tools":true}),
summary: "intent gate forced tool mode".to_string(),
},
Some("t1"),
)
.await;
let res = tool
.call(r#"{"action":"timeline","task_id":"t1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("Task Timeline"));
assert!(res.contains(">>> DECISION"));
}
#[tokio::test]
async fn test_diagnose_finds_first_error() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t1".to_string(),
description: "Run".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::Error,
ErrorData::tool_error(
"terminal",
"No such file or directory",
Some("t1".to_string()),
),
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 2,
iterations: 1,
tool_calls_count: 1,
error: Some("failed".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t1"),
)
.await;
let res = tool
.call(r#"{"action":"diagnose","task_id":"t1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("Diagnosis for Task t1"));
assert!(res.contains("Most Likely Cause(s)"));
}
#[tokio::test]
async fn test_diagnose_includes_execution_replay_summary() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t-replay".to_string(),
description: "Deploy app".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t-replay"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::ExecutionPlanningGate,
task_id: "t-replay".to_string(),
iteration: 1,
severity: crate::events::DiagnosticSeverity::Warning,
code: Some("plan_rejected".to_string()),
metadata: json!({
"condition": "plan_rejected",
"gate_result": "rejected",
"tool": "edit_file",
"target_hint": "src/main.rs",
"reason": "first action mutated the wrong file",
"loop_repetition_reason": "plan_rejected"
}),
summary: "Structured pre-execution plan rejected".to_string(),
},
Some("t-replay"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::EvidenceGate,
task_id: "t-replay".to_string(),
iteration: 2,
severity: crate::events::DiagnosticSeverity::Warning,
code: Some("missing_pre_execution_evidence".to_string()),
metadata: json!({
"condition": "missing_pre_execution_evidence",
"tool": "edit_file",
"required_evidence_kind": "file_read",
"target": "src/main.rs",
"loop_repetition_reason": "missing_evidence"
}),
summary: "Evidence gate blocked edit".to_string(),
},
Some("t-replay"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::PostExecutionValidation,
task_id: "t-replay".to_string(),
iteration: 3,
severity: crate::events::DiagnosticSeverity::Warning,
code: Some("verification_pending".to_string()),
metadata: json!({
"outcome": "verify_again",
"reason": "verification_pending",
"loop_repetition_reason": "verification_pending"
}),
summary: "Verification still required".to_string(),
},
Some("t-replay"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t-replay".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 3,
iterations: 3,
tool_calls_count: 0,
error: Some("blocked".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t-replay"),
)
.await;
let res = tool
.call(r#"{"action":"diagnose","task_id":"t-replay","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("### Execution Replay Summary"));
assert!(res.contains("Plan revisions"));
assert!(res.contains("Evidence gate triggers"));
assert!(res.contains("Validation failures"));
assert!(res.contains("Retry reasons"));
assert!(res.contains("plan rejected"));
assert!(res.contains("verification pending"));
}
#[tokio::test]
async fn test_diagnose_includes_recent_route_reason_trends() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t1".to_string(),
description: "Run".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::IntentGate,
task_id: "t1".to_string(),
iteration: 1,
severity: crate::events::DiagnosticSeverity::Info,
code: Some("tools_required".to_string()),
metadata: json!({
"needs_tools": true,
"route_reason": "tools_required",
"route_action": "continue"
}),
summary: "Intent gate routed to tools".to_string(),
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::IntentGate,
task_id: "t1".to_string(),
iteration: 2,
severity: crate::events::DiagnosticSeverity::Info,
code: Some("acknowledgment_direct_reply".to_string()),
metadata: json!({
"needs_tools": false,
"route_reason": "acknowledgment_direct_reply",
"route_action": "return",
"route_reply_len": 7
}),
summary: "Intent gate returned direct acknowledgment".to_string(),
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::Error,
ErrorData::tool_error(
"terminal",
"No such file or directory",
Some("t1".to_string()),
),
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 2,
iterations: 2,
tool_calls_count: 1,
error: Some("failed".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t1"),
)
.await;
let res = tool
.call(r#"{"action":"diagnose","task_id":"t1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("### Route Health Alerts"));
assert!(res.contains("### Recent Route Reason Trends"));
assert!(res.contains("tools_required"));
assert!(res.contains("acknowledgment_direct_reply"));
assert!(res.contains("Action mix:"));
}
#[tokio::test]
async fn test_diagnose_includes_context_scope_guardrails_section() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t-ctx".to_string(),
description: "Run".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t-ctx"),
)
.await;
append_event(
&tool,
"s1",
EventType::Error,
ErrorData::tool_error("terminal", "boom", Some("t-ctx".to_string())),
Some("t-ctx"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t-ctx".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 1,
iterations: 1,
tool_calls_count: 0,
error: Some("failed".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t-ctx"),
)
.await;
let res = tool
.call(r#"{"action":"diagnose","task_id":"t-ctx","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("### Context/Scope Guardrails"));
assert!(res.contains("context_bleed_prevented_total"));
assert!(res.contains("cross_scope_blocked_total"));
}
#[tokio::test]
async fn test_diagnose_route_health_alerts_detect_empty_reply_and_drift() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "t1".to_string(),
description: "Run".to_string(),
parent_task_id: None,
user_message: None,
turn_id: None,
},
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::Error,
ErrorData::tool_error(
"terminal",
"No such file or directory",
Some("t1".to_string()),
),
Some("t1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "t1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 2,
iterations: 1,
tool_calls_count: 1,
error: Some("failed".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("t1"),
)
.await;
let baseline_start = Utc::now() - Duration::days(3);
for i in 0..40 {
let reason = if i < 4 {
"tools_required"
} else if i < 6 {
"clarification_required"
} else {
"default_continue"
};
let action = if reason == "clarification_required" {
"return"
} else {
"continue"
};
let reply_len = if action == "return" {
Some(24u64)
} else {
None
};
append_event_at(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::IntentGate,
task_id: "t1".to_string(),
iteration: (i + 1) as u32,
severity: crate::events::DiagnosticSeverity::Info,
code: Some(reason.to_string()),
metadata: json!({
"route_reason": reason,
"route_action": action,
"route_reply_len": reply_len
}),
summary: format!("Baseline route {reason}"),
},
Some("t1"),
baseline_start + Duration::seconds(i as i64),
)
.await;
}
let recent_start = Utc::now() - Duration::minutes(30);
for i in 0..30 {
let (reason, action, reply_len) = if i >= 18 {
("clarification_required", "return", Some(28u64))
} else if i == 10 {
("acknowledgment_direct_reply", "return", Some(0u64))
} else {
("tools_required", "continue", None)
};
append_event_at(
&tool,
"s1",
EventType::DecisionPoint,
DecisionPointData {
decision_type: DecisionType::IntentGate,
task_id: "t1".to_string(),
iteration: (100 + i) as u32,
severity: crate::events::DiagnosticSeverity::Info,
code: Some(reason.to_string()),
metadata: json!({
"route_reason": reason,
"route_action": action,
"route_reply_len": reply_len
}),
summary: format!("Recent route {reason}"),
},
Some("t1"),
recent_start + Duration::seconds(i as i64),
)
.await;
}
let res = tool
.call(r#"{"action":"diagnose","task_id":"t1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("### Route Health Alerts"));
assert!(res.contains("Empty direct reply detected"));
assert!(res.contains("`tools_required` rate drifted"));
assert!(res.contains("`clarification_required` spiked"));
}
#[tokio::test]
async fn test_diagnose_includes_resume_recovery_state() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "parent-1".to_string(),
description: "Build website and deploy".to_string(),
parent_task_id: None,
user_message: Some("Build website and deploy".to_string()),
turn_id: None,
},
Some("parent-1"),
)
.await;
append_event(
&tool,
"s1",
EventType::ThinkingStart,
ThinkingStartData {
iteration: 3,
task_id: "parent-1".to_string(),
total_tool_calls: 2,
},
Some("parent-1"),
)
.await;
append_event(
&tool,
"s1",
EventType::AssistantResponse,
AssistantResponseData {
message_id: None,
content: Some("I'll continue by checking deployment config.".to_string()),
tool_calls: Some(vec![crate::events::ToolCallInfo {
id: "call-parent-pending".to_string(),
name: "system_info".to_string(),
arguments: json!({}),
extra_content: None,
}]),
model: "mock-fast".to_string(),
input_tokens: None,
output_tokens: None,
annotations: Vec::new(),
turn_id: None,
},
Some("parent-1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "parent-1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 42,
iterations: 3,
tool_calls_count: 1,
error: Some(
"Agent process interrupted before completion. Resumed in task child-1."
.to_string(),
),
summary: Some("Recovered from checkpoint after interruption".to_string()),
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("parent-1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskStart,
TaskStartData {
task_id: "child-1".to_string(),
description: "resume: Build website and deploy".to_string(),
parent_task_id: Some("parent-1".to_string()),
user_message: Some("continue".to_string()),
turn_id: None,
},
Some("child-1"),
)
.await;
append_event(
&tool,
"s1",
EventType::TaskEnd,
TaskEndData {
task_id: "child-1".to_string(),
status: TaskStatus::Failed,
outcome: Some(crate::events::TaskOutcome::Failed),
duration_secs: 10,
iterations: 1,
tool_calls_count: 0,
error: Some("still failing".to_string()),
summary: None,
efficiency: None,
turn_id: None,
harness_eval: None,
},
Some("child-1"),
)
.await;
let res = tool
.call(r#"{"action":"diagnose","task_id":"child-1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(res.contains("### Resume Recovery State"));
assert!(res.contains("Resumed from previous task: `parent-1`"));
assert!(res.contains("Previous checkpoint: iteration 3"));
assert!(res.contains("Previous task resumed into: `child-1`"));
}
#[tokio::test]
async fn test_pii_redaction_in_timeline() {
let tool = setup_tool().await;
append_event(
&tool,
"s1",
EventType::ToolResult,
ToolResultData {
message_id: None,
tool_call_id: "c1".to_string(),
name: "web_fetch".to_string(),
result: "token sk-abc123456789012345678901234567890 leaked".to_string(),
success: false,
duration_ms: 10,
error: None,
task_id: Some("t1".to_string()),
annotations: Vec::new(),
turn_id: None,
attachments: Vec::new(),
},
Some("t1"),
)
.await;
let res = tool
.call(r#"{"action":"timeline","task_id":"t1","_session_id":"s1"}"#)
.await
.unwrap();
assert!(!res.contains("sk-abc"));
assert!(res.contains("REDACTED"));
}
}