Skip to main content

tandem_server/bug_monitor/
service.rs

1use anyhow::Result;
2use serde_json::{Map, Value};
3
4use crate::bug_monitor::types::BugMonitorIncidentRecord;
5use crate::bug_monitor::types::{
6    BugMonitorConfig, BugMonitorQualityGateReport, BugMonitorQualityGateResult,
7    BugMonitorSubmission,
8};
9use crate::EngineEvent;
10use crate::{
11    app::state::{sha256_hex, truncate_text, AppState},
12    now_ms,
13};
14
15fn bug_monitor_triage_timeout_deadline_ms(created_at_ms: u64, timeout_ms: u64) -> u64 {
16    created_at_ms.saturating_add(timeout_ms)
17}
18
19const BUG_MONITOR_TRIAGE_AUTOMATION_PREFIX: &str = "automation-v2-bug-monitor-triage-";
20const BUG_MONITOR_TRIAGE_AGENT_ROLE: &str = "bug_monitor_triage_agent";
21
22/// Strip per-run identifiers from a failure reason before fingerprinting
23/// so that recurrences of the same logical failure dedup correctly.
24///
25/// Some node failures embed the run ID in the reason text — e.g.
26/// `required output ` `` `.tandem/runs/automation-v2-run-<uuid>/artifacts/foo.json` ``
27/// `was not created` — and excluding `run_id` from the hash inputs is
28/// not enough on its own because the same UUID leaks back in via the
29/// reason string. Replace any `automation-v2-run-<uuid>` and any bare
30/// UUID with a stable placeholder. Numeric values (timeouts, attempt
31/// counts, byte sizes) are intentionally left alone: `timed out after
32/// 180000 ms` and `timed out after 600000 ms` are genuinely different
33/// failure shapes and should not collapse to one incident.
34fn normalize_reason_for_fingerprint(reason: &str) -> String {
35    let after_run_ids = automation_run_id_regex().replace_all(reason, "automation-v2-run-RUNID");
36    uuid_regex()
37        .replace_all(after_run_ids.as_ref(), "UUID")
38        .into_owned()
39}
40
41fn node_id_from_failure_reason(reason: &str) -> Option<String> {
42    for regex in [
43        node_outcomes_reason_regex(),
44        automation_node_timeout_reason_regex(),
45    ] {
46        if let Some(captures) = regex.captures(reason) {
47            let value = captures
48                .get(1)
49                .map(|match_| match_.as_str())
50                .unwrap_or_default()
51                .trim()
52                .trim_matches('`')
53                .trim();
54            if !value.is_empty() {
55                return Some(value.to_string());
56            }
57        }
58    }
59    None
60}
61
62fn is_node_outcomes_reason(reason: &str) -> bool {
63    node_outcomes_reason_regex().is_match(reason)
64}
65
66fn node_outcomes_reason_regex() -> &'static regex::Regex {
67    static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
68    REGEX.get_or_init(|| {
69        regex::Regex::new(r"(?i)node outcomes:\s*`?([A-Za-z0-9_.:-]+)`?")
70            .expect("node outcomes reason regex")
71    })
72}
73
74fn node_incident_matches_aggregate_outcome(
75    incident: &BugMonitorIncidentRecord,
76    repo: &str,
77    workspace_root: &str,
78    event_type: &str,
79    workflow_id: &str,
80    run_id: &str,
81    node_id: &str,
82) -> bool {
83    if incident.repo != repo
84        || incident.workspace_root != workspace_root
85        || incident.event_type != event_type
86        || incident.run_id.as_deref() != Some(run_id)
87        || incident.fingerprint.trim().is_empty()
88    {
89        return false;
90    }
91    let Some(payload) = incident.event_payload.as_ref() else {
92        return false;
93    };
94    let incident_workflow_id = first_string_deep(
95        payload,
96        &["workflow_id", "workflowID", "automation_id", "automationID"],
97    );
98    if incident_workflow_id.as_deref() != Some(workflow_id) {
99        return false;
100    }
101    let incident_node_id = first_string_deep(
102        payload,
103        &[
104            "node_id", "nodeID", "task_id", "taskID", "stage_id", "stageID",
105        ],
106    );
107    if incident_node_id.as_deref() != Some(node_id) {
108        return false;
109    }
110    let incident_reason = first_string_deep(payload, &["reason", "error", "message"]);
111    !incident_reason
112        .as_deref()
113        .is_some_and(is_node_outcomes_reason)
114}
115
116async fn existing_node_incident_fingerprint_for_aggregate_outcome(
117    state: &AppState,
118    repo: &str,
119    workspace_root: &str,
120    event_type: &str,
121    workflow_id: Option<&str>,
122    run_id: Option<&str>,
123    node_id: Option<&str>,
124    reason: Option<&str>,
125) -> Option<String> {
126    if !reason.is_some_and(is_node_outcomes_reason) {
127        return None;
128    }
129    let workflow_id = workflow_id?;
130    let run_id = run_id?;
131    let node_id = node_id?;
132    let mut rows = state
133        .bug_monitor_incidents
134        .read()
135        .await
136        .values()
137        .filter(|incident| {
138            node_incident_matches_aggregate_outcome(
139                incident,
140                repo,
141                workspace_root,
142                event_type,
143                workflow_id,
144                run_id,
145                node_id,
146            )
147        })
148        .cloned()
149        .collect::<Vec<_>>();
150    rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
151    rows.into_iter().next().map(|incident| incident.fingerprint)
152}
153
154fn automation_node_timeout_reason_regex() -> &'static regex::Regex {
155    static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
156    REGEX.get_or_init(|| {
157        regex::Regex::new(r"(?i)automation node\s+`([^`]+)`\s+timed out")
158            .expect("automation node timeout reason regex")
159    })
160}
161
162fn automation_run_id_regex() -> &'static regex::Regex {
163    static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
164    REGEX.get_or_init(|| {
165        regex::Regex::new(
166            r"automation-v2-run-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
167        )
168        .expect("automation run id regex")
169    })
170}
171
172fn uuid_regex() -> &'static regex::Regex {
173    static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
174    REGEX.get_or_init(|| {
175        regex::Regex::new(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
176            .expect("uuid regex")
177    })
178}
179
180/// Returns a human-readable reason if this event was emitted by the
181/// bug monitor's own triage workflow. Used to short-circuit
182/// `process_event` so a triage failure doesn't recursively trigger
183/// another triage.
184///
185/// The canonical signal is the `automation-v2-bug-monitor-triage-`
186/// `automation_id` prefix set by `bug_monitor_triage_spec`. The
187/// `agent_role == "bug_monitor_triage_agent"` check is a backstop
188/// for events that arrive without any automation/workflow id at all
189/// — without that gate, a user's custom automation that happens to
190/// use the same agent_id string would be silently excluded from
191/// bug monitoring (caught by Codex on PR #53).
192fn recursive_triage_skip_reason(event: &EngineEvent) -> Option<String> {
193    let automation_id = first_string_deep(
194        &event.properties,
195        &["automation_id", "automationID", "workflow_id", "workflowID"],
196    );
197    if let Some(id) = automation_id.as_deref() {
198        if id.starts_with(BUG_MONITOR_TRIAGE_AUTOMATION_PREFIX) {
199            return Some(format!(
200                "automation_id={id} originates from bug monitor triage"
201            ));
202        }
203        // automation_id is present but doesn't have the triage prefix
204        // — this is a normal user workflow failure even if the agent
205        // role string happens to match. Don't fall through to the
206        // agent_role backstop.
207        return None;
208    }
209    let agent_role = first_string_deep(&event.properties, &["agent_role", "agentRole"]);
210    if agent_role
211        .as_deref()
212        .is_some_and(|role| role.eq_ignore_ascii_case(BUG_MONITOR_TRIAGE_AGENT_ROLE))
213    {
214        return Some(format!(
215            "agent_role={} is the bug monitor triage agent",
216            agent_role.unwrap_or_default()
217        ));
218    }
219    None
220}
221
222/// Build a multi-line `last_post_error` describing why the triage run
223/// missed its deadline. The first line is the original short message
224/// (preserving backwards compat for any consumer that reads the first
225/// line). Subsequent lines are the structured diagnostics, suitable
226/// for embedding in the GitHub issue body's "Triage timeout details"
227/// section. When diagnostics could not be loaded (run state missing
228/// or corrupt), the message degrades gracefully to the single-line
229/// pre-diagnostics format.
230fn compose_triage_timeout_last_post_error(
231    triage_run_id: &str,
232    timeout_ms: u64,
233    diagnostics: Option<&serde_json::Value>,
234) -> String {
235    let head =
236        format!("triage run {triage_run_id} did not reach a terminal status within {timeout_ms}ms");
237    match diagnostics {
238        Some(value) => {
239            let detail =
240                crate::http::context_runs::format_bug_monitor_triage_timeout_diagnostics(value);
241            if detail.trim().is_empty() {
242                head
243            } else {
244                format!("{head}\n{detail}")
245            }
246        }
247        None => head,
248    }
249}
250
251fn draft_has_github_issue(draft: &crate::BugMonitorDraftRecord) -> bool {
252    draft.issue_number.is_some() || draft.github_issue_url.is_some()
253}
254
255fn draft_is_triage_timed_out(draft: &crate::BugMonitorDraftRecord) -> bool {
256    draft
257        .github_status
258        .as_deref()
259        .is_some_and(|status| status.eq_ignore_ascii_case("triage_timed_out"))
260}
261
262async fn bug_monitor_incident_for_draft(
263    state: &AppState,
264    draft_id: &str,
265    triage_run_id: &str,
266) -> Option<String> {
267    let incidents = state.bug_monitor_incidents.read().await;
268    incidents
269        .values()
270        .find(|incident| {
271            incident.draft_id.as_deref() == Some(draft_id)
272                || incident.triage_run_id.as_deref() == Some(triage_run_id)
273        })
274        .map(|incident| incident.incident_id.clone())
275}
276
277pub async fn recover_overdue_bug_monitor_triage_runs(
278    state: &AppState,
279) -> anyhow::Result<Vec<(String, Option<String>)>> {
280    let config = state.bug_monitor_config().await;
281    let Some(timeout_ms) = config.triage_timeout_ms else {
282        return Ok(Vec::new());
283    };
284    if !config.enabled || config.paused {
285        return Ok(Vec::new());
286    }
287
288    let now = now_ms();
289    let drafts = {
290        let guard = state.bug_monitor_drafts.read().await;
291        guard.values().cloned().collect::<Vec<_>>()
292    };
293
294    let mut recovered = Vec::new();
295    for draft in drafts {
296        let Some(triage_run_id) = draft.triage_run_id.clone() else {
297            continue;
298        };
299        if draft_has_github_issue(&draft) {
300            continue;
301        }
302        if draft_is_triage_timed_out(&draft) {
303            let incident_id =
304                bug_monitor_incident_for_draft(state, &draft.draft_id, &triage_run_id).await;
305            recovered.push((draft.draft_id.clone(), incident_id));
306            continue;
307        }
308        match crate::http::bug_monitor::finalize_completed_bug_monitor_triage(
309            state,
310            &draft.draft_id,
311        )
312        .await
313        {
314            Ok(true) => continue,
315            Ok(false) => {}
316            Err(error) => {
317                tracing::warn!(
318                    draft_id = %draft.draft_id,
319                    triage_run_id = %triage_run_id,
320                    error = %error,
321                    "failed to finalize completed Bug Monitor triage during recovery scan",
322                );
323            }
324        }
325
326        let run_created_at_ms =
327            crate::http::bug_monitor::bug_monitor_triage_effective_started_at_ms(
328                state,
329                &triage_run_id,
330            )
331            .await
332            .unwrap_or(draft.created_at_ms);
333        if now < bug_monitor_triage_timeout_deadline_ms(run_created_at_ms, timeout_ms) {
334            continue;
335        }
336
337        let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
338            state,
339            &triage_run_id,
340            timeout_ms,
341        )
342        .await;
343        let last_post_error = compose_triage_timeout_last_post_error(
344            &triage_run_id,
345            timeout_ms,
346            diagnostics_value.as_ref(),
347        );
348        // Atomic CAS: only the caller that actually flips github_status
349        // to triage_timed_out continues into the publish path. A second
350        // concurrent recover_overdue invocation reading the same
351        // not-yet-timed-out draft will see `Ok(None)` here and skip the
352        // publish — closing the race that produced duplicate GitHub
353        // issues (#45 / #46) when two status pollers fire near
354        // simultaneously. A persistence failure surfaces as `Err` and
355        // is propagated via `?` so we don't publish without a durable
356        // marker (which would re-publish on restart and create a
357        // duplicate).
358        let Some(current_draft) = state
359            .try_mark_triage_timed_out(&draft.draft_id, last_post_error.clone())
360            .await?
361        else {
362            continue;
363        };
364
365        let incident_id =
366            bug_monitor_incident_for_draft(state, &current_draft.draft_id, &triage_run_id).await;
367        if let Some(incident_id) = incident_id.as_deref() {
368            if let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await {
369                incident.status = "triage_timed_out".to_string();
370                incident.last_error = Some(last_post_error.clone());
371                incident.updated_at_ms = now;
372                state.put_bug_monitor_incident(incident.clone()).await?;
373                let mut event_payload = serde_json::json!({
374                    "incident_id": incident.incident_id,
375                    "draft_id": current_draft.draft_id,
376                    "triage_run_id": triage_run_id,
377                    "timeout_ms": timeout_ms,
378                });
379                if let Some(diagnostics) = diagnostics_value.as_ref() {
380                    if let Some(obj) = event_payload.as_object_mut() {
381                        obj.insert("diagnostics".to_string(), diagnostics.clone());
382                    }
383                }
384                state.event_bus.publish(EngineEvent::new(
385                    "bug_monitor.incident.triage_timed_out",
386                    event_payload,
387                ));
388            }
389        }
390
391        recovered.push((current_draft.draft_id.clone(), incident_id));
392    }
393
394    Ok(recovered)
395}
396
397async fn recover_stale_bug_monitor_triage_event(
398    state: &AppState,
399    event: &EngineEvent,
400) -> anyhow::Result<Option<BugMonitorIncidentRecord>> {
401    if event.event_type != "automation_v2.run.paused_stale_no_provider_activity" {
402        return Ok(None);
403    }
404    let Some(triage_run_id) = first_string_deep(&event.properties, &["run_id", "runID"]) else {
405        return Ok(None);
406    };
407    let Some(draft) = ({
408        let guard = state.bug_monitor_drafts.read().await;
409        guard
410            .values()
411            .find(|draft| draft.triage_run_id.as_deref() == Some(triage_run_id.as_str()))
412            .cloned()
413    }) else {
414        return Ok(None);
415    };
416    if draft_has_github_issue(&draft) {
417        return Ok(None);
418    }
419
420    let timeout_ms = state
421        .bug_monitor_config()
422        .await
423        .triage_timeout_ms
424        .or_else(|| first_u64(&event.properties, &["stale_after_ms", "staleAfterMs"]))
425        .unwrap_or_default();
426    let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
427        state,
428        &triage_run_id,
429        timeout_ms,
430    )
431    .await;
432    let last_post_error = compose_triage_timeout_last_post_error(
433        &triage_run_id,
434        timeout_ms,
435        diagnostics_value.as_ref(),
436    );
437    let marked_now = match state
438        .try_mark_triage_timed_out(&draft.draft_id, last_post_error.clone())
439        .await?
440    {
441        Some(current_draft) => Some(current_draft),
442        None => {
443            let Some(current_draft) = state.get_bug_monitor_draft(&draft.draft_id).await else {
444                return Ok(None);
445            };
446            if draft_has_github_issue(&current_draft) || !draft_is_triage_timed_out(&current_draft)
447            {
448                return Ok(None);
449            }
450            Some(current_draft)
451        }
452    };
453    let Some(current_draft) = marked_now else {
454        return Ok(None);
455    };
456
457    let incident_id =
458        bug_monitor_incident_for_draft(state, &current_draft.draft_id, &triage_run_id).await;
459    let Some(incident_id) = incident_id else {
460        return Ok(None);
461    };
462    let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await else {
463        return Ok(None);
464    };
465    let now = now_ms();
466    incident.status = "triage_timed_out".to_string();
467    incident.last_error = Some(
468        current_draft
469            .last_post_error
470            .clone()
471            .unwrap_or(last_post_error.clone()),
472    );
473    incident.updated_at_ms = now;
474    state.put_bug_monitor_incident(incident.clone()).await?;
475
476    if !draft_is_triage_timed_out(&draft) {
477        let mut event_payload = serde_json::json!({
478            "incident_id": incident.incident_id,
479            "draft_id": current_draft.draft_id,
480            "triage_run_id": triage_run_id,
481            "timeout_ms": timeout_ms,
482            "reason": "bug monitor triage automation paused after no provider activity",
483        });
484        if let Some(diagnostics) = diagnostics_value.as_ref() {
485            if let Some(obj) = event_payload.as_object_mut() {
486                obj.insert("diagnostics".to_string(), diagnostics.clone());
487            }
488        }
489        state.event_bus.publish(EngineEvent::new(
490            "bug_monitor.incident.triage_timed_out",
491            event_payload,
492        ));
493    }
494
495    match crate::bug_monitor_github::publish_draft(
496        state,
497        &current_draft.draft_id,
498        Some(&incident.incident_id),
499        crate::bug_monitor_github::PublishMode::Recovery,
500    )
501    .await
502    {
503        Ok(outcome) => {
504            incident.status = outcome.action;
505            incident.last_error = None;
506        }
507        Err(error) => {
508            incident.last_error = Some(truncate_text(&error.to_string(), 500));
509        }
510    }
511    incident.updated_at_ms = now_ms();
512    state.put_bug_monitor_incident(incident.clone()).await?;
513    Ok(Some(incident))
514}
515
516pub async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
517    let mut excerpt = Vec::new();
518    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
519        excerpt.push(reason);
520    }
521    if let Some(title) = first_string(properties, &["title", "task"]) {
522        if !excerpt.iter().any(|row| row == &title) {
523            excerpt.push(title);
524        }
525    }
526    let logs = state.logs.read().await;
527    for entry in logs.iter().rev().take(3) {
528        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
529            excerpt.push(truncate_text(message, 240));
530        }
531    }
532    excerpt.truncate(8);
533    excerpt
534}
535
536fn is_non_empty(value: &Option<String>) -> bool {
537    value
538        .as_deref()
539        .map(str::trim)
540        .is_some_and(|value| !value.is_empty())
541}
542
543fn event_is_routine_noise(event: Option<&str>) -> bool {
544    let normalized = event.unwrap_or_default().trim().to_ascii_lowercase();
545    !normalized.is_empty()
546        && [
547            "progress",
548            "heartbeat",
549            "started",
550            "queued",
551            "retrying",
552            "attempt.started",
553            "minor_retry",
554        ]
555        .iter()
556        .any(|term| normalized.contains(term))
557}
558
559pub fn evaluate_bug_monitor_submission_quality(
560    submission: &BugMonitorSubmission,
561) -> BugMonitorQualityGateReport {
562    let source_known = is_non_empty(&submission.source)
563        || is_non_empty(&submission.component)
564        || is_non_empty(&submission.process)
565        || is_non_empty(&submission.event);
566    let type_classified = is_non_empty(&submission.event) || is_non_empty(&submission.level);
567    let confidence_recorded = is_non_empty(&submission.confidence);
568    let dedupe_checked = is_non_empty(&submission.fingerprint);
569    let evidence_exists = !submission.evidence_refs.is_empty()
570        || !submission.excerpt.is_empty()
571        || is_non_empty(&submission.detail)
572        || is_non_empty(&submission.file_name);
573    let destination_clear = is_non_empty(&submission.expected_destination);
574    let risk_known = is_non_empty(&submission.risk_level);
575    let not_routine_noise = !event_is_routine_noise(submission.event.as_deref());
576
577    let gate_specs = [
578        (
579            "source_known",
580            "Source known",
581            source_known,
582            submission
583                .source
584                .clone()
585                .or_else(|| submission.component.clone())
586                .or_else(|| submission.process.clone())
587                .or_else(|| submission.event.clone()),
588        ),
589        (
590            "type_classified",
591            "Signal type classified",
592            type_classified,
593            submission
594                .event
595                .clone()
596                .or_else(|| submission.level.clone()),
597        ),
598        (
599            "confidence_recorded",
600            "Confidence recorded",
601            confidence_recorded,
602            submission.confidence.clone(),
603        ),
604        (
605            "dedupe_checked",
606            "Dedupe/fingerprint checked",
607            dedupe_checked,
608            submission.fingerprint.clone(),
609        ),
610        (
611            "evidence_present",
612            "Evidence or artifact refs present",
613            evidence_exists,
614            submission
615                .evidence_refs
616                .first()
617                .cloned()
618                .or_else(|| submission.excerpt.first().cloned())
619                .or_else(|| submission.file_name.clone()),
620        ),
621        (
622            "destination_clear",
623            "Expected destination clear",
624            destination_clear,
625            submission.expected_destination.clone(),
626        ),
627        (
628            "risk_known",
629            "Risk level known",
630            risk_known,
631            submission.risk_level.clone(),
632        ),
633        (
634            "not_routine_noise",
635            "Not routine progress or minor retry",
636            not_routine_noise,
637            submission.event.clone(),
638        ),
639    ];
640
641    let gates = gate_specs
642        .into_iter()
643        .map(|(key, label, passed, detail)| BugMonitorQualityGateResult {
644            key: key.to_string(),
645            label: label.to_string(),
646            passed,
647            detail,
648        })
649        .collect::<Vec<_>>();
650    let passed_count = gates.iter().filter(|gate| gate.passed).count();
651    let missing = gates
652        .iter()
653        .filter(|gate| !gate.passed)
654        .map(|gate| gate.key.clone())
655        .collect::<Vec<_>>();
656    let passed = passed_count == gates.len();
657    BugMonitorQualityGateReport {
658        stage: "intake_to_draft".to_string(),
659        status: if passed { "passed" } else { "blocked" }.to_string(),
660        passed,
661        passed_count,
662        total_count: gates.len(),
663        blocked_reason: if passed {
664            None
665        } else {
666            Some(format!("missing quality gates: {}", missing.join(", ")))
667        },
668        gates,
669        missing,
670    }
671}
672
673pub async fn process_event(
674    state: &AppState,
675    event: &EngineEvent,
676    config: &BugMonitorConfig,
677) -> anyhow::Result<BugMonitorIncidentRecord> {
678    if let Some(reason) = recursive_triage_skip_reason(event) {
679        if let Some(incident) = recover_stale_bug_monitor_triage_event(state, event).await? {
680            return Ok(incident);
681        }
682        // Don't queue a new triage workflow for a failure that came
683        // from the bug monitor's own triage workflow. Otherwise a
684        // single triage failure spawns a triage-of-triage, which can
685        // itself fail the same way and cascade. Observed in issues
686        // #43, #47, #51 chained through `automation-v2-bug-monitor-
687        // triage-...` automation_ids. We surface this as an error so
688        // the bug-monitor runtime status reflects the skip; the
689        // poller treats this as a soft skip rather than a hard
690        // failure.
691        anyhow::bail!("skipping recursive bug monitor triage event: {reason}");
692    }
693    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
694    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
695        state,
696        submission.repo.as_deref().unwrap_or_default(),
697        submission.fingerprint.as_deref().unwrap_or_default(),
698        submission.title.as_deref(),
699        submission.detail.as_deref(),
700        &submission.excerpt,
701        3,
702    )
703    .await;
704    let fingerprint = submission
705        .fingerprint
706        .clone()
707        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
708    let default_workspace_root = state.workspace_index.snapshot().await.root;
709    let workspace_root = config
710        .workspace_root
711        .clone()
712        .unwrap_or(default_workspace_root);
713    let now = crate::util::time::now_ms();
714    let quality_gate = evaluate_bug_monitor_submission_quality(&submission);
715
716    let existing = state
717        .bug_monitor_incidents
718        .read()
719        .await
720        .values()
721        .find(|row| row.fingerprint == fingerprint)
722        .cloned();
723
724    let mut incident = if let Some(mut row) = existing {
725        row.occurrence_count = row.occurrence_count.saturating_add(1);
726        row.updated_at_ms = now;
727        row.last_seen_at_ms = Some(now);
728        if row.excerpt.is_empty() {
729            row.excerpt = submission.excerpt.clone();
730        }
731        if row.confidence.is_none() {
732            row.confidence = submission.confidence.clone();
733        }
734        if row.risk_level.is_none() {
735            row.risk_level = submission.risk_level.clone();
736        }
737        if row.expected_destination.is_none() {
738            row.expected_destination = submission.expected_destination.clone();
739        }
740        row.quality_gate = Some(quality_gate.clone());
741        for evidence_ref in &submission.evidence_refs {
742            if !row
743                .evidence_refs
744                .iter()
745                .any(|existing| existing == evidence_ref)
746            {
747                row.evidence_refs.push(evidence_ref.clone());
748            }
749        }
750        row
751    } else {
752        BugMonitorIncidentRecord {
753            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
754            fingerprint: fingerprint.clone(),
755            event_type: event.event_type.clone(),
756            status: "queued".to_string(),
757            repo: submission.repo.clone().unwrap_or_default(),
758            workspace_root,
759            title: submission
760                .title
761                .clone()
762                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
763            detail: submission.detail.clone(),
764            excerpt: submission.excerpt.clone(),
765            source: submission.source.clone(),
766            run_id: submission.run_id.clone(),
767            session_id: submission.session_id.clone(),
768            correlation_id: submission.correlation_id.clone(),
769            component: submission.component.clone(),
770            level: submission.level.clone(),
771            occurrence_count: 1,
772            created_at_ms: now,
773            updated_at_ms: now,
774            last_seen_at_ms: Some(now),
775            draft_id: None,
776            triage_run_id: None,
777            last_error: None,
778            confidence: submission.confidence.clone(),
779            risk_level: submission.risk_level.clone(),
780            expected_destination: submission.expected_destination.clone(),
781            evidence_refs: submission.evidence_refs.clone(),
782            quality_gate: Some(quality_gate.clone()),
783            duplicate_summary: None,
784            duplicate_matches: None,
785            event_payload: Some(event.properties.clone()),
786        }
787    };
788    state.put_bug_monitor_incident(incident.clone()).await?;
789
790    if !duplicate_matches.is_empty() {
791        incident.status = "duplicate_suppressed".to_string();
792        let duplicate_summary =
793            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
794        incident.duplicate_summary = Some(duplicate_summary.clone());
795        incident.duplicate_matches = Some(duplicate_matches.clone());
796        incident.updated_at_ms = crate::util::time::now_ms();
797        state.put_bug_monitor_incident(incident.clone()).await?;
798        state.event_bus.publish(EngineEvent::new(
799            "bug_monitor.incident.duplicate_suppressed",
800            serde_json::json!({
801                "incident_id": incident.incident_id,
802                "fingerprint": incident.fingerprint,
803                "eventType": incident.event_type,
804                "status": incident.status,
805                "duplicate_summary": duplicate_summary,
806                "duplicate_matches": duplicate_matches,
807            }),
808        ));
809        return Ok(incident);
810    }
811
812    let draft = match state.submit_bug_monitor_draft(submission).await {
813        Ok(draft) => draft,
814        Err(error) => {
815            incident.status = "draft_failed".to_string();
816            incident.last_error = Some(truncate_text(&error.to_string(), 500));
817            incident.updated_at_ms = crate::util::time::now_ms();
818            state.put_bug_monitor_incident(incident.clone()).await?;
819            state.event_bus.publish(EngineEvent::new(
820                "bug_monitor.incident.detected",
821                serde_json::json!({
822                    "incident_id": incident.incident_id,
823                    "fingerprint": incident.fingerprint,
824                    "eventType": incident.event_type,
825                    "draft_id": incident.draft_id,
826                    "triage_run_id": incident.triage_run_id,
827                    "status": incident.status,
828                    "detail": incident.last_error,
829                }),
830            ));
831            return Ok(incident);
832        }
833    };
834    incident.draft_id = Some(draft.draft_id.clone());
835    incident.status = "draft_created".to_string();
836    state.put_bug_monitor_incident(incident.clone()).await?;
837
838    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
839        state.clone(),
840        &draft.draft_id,
841        true,
842    )
843    .await
844    {
845        Ok((updated_draft, _run_id, _deduped)) => {
846            incident.triage_run_id = updated_draft.triage_run_id.clone();
847            if incident.triage_run_id.is_some() {
848                incident.status = "triage_queued".to_string();
849            }
850            incident.last_error = None;
851        }
852        Err(error) => {
853            incident.status = "draft_created".to_string();
854            incident.last_error = Some(truncate_text(&error.to_string(), 500));
855        }
856    }
857
858    if let Some(draft_id) = incident.draft_id.clone() {
859        let latest_draft = state
860            .get_bug_monitor_draft(&draft_id)
861            .await
862            .unwrap_or(draft.clone());
863        match crate::bug_monitor_github::publish_draft(
864            state,
865            &draft_id,
866            Some(&incident.incident_id),
867            crate::bug_monitor_github::PublishMode::Auto,
868        )
869        .await
870        {
871            Ok(outcome) => {
872                incident.status = outcome.action;
873                incident.last_error = None;
874            }
875            Err(error) => {
876                let detail = truncate_text(&error.to_string(), 500);
877                incident.last_error = Some(detail.clone());
878                let mut failed_draft = latest_draft;
879                failed_draft.status = "github_post_failed".to_string();
880                failed_draft.github_status = Some("github_post_failed".to_string());
881                failed_draft.last_post_error = Some(detail.clone());
882                let evidence_digest = failed_draft.evidence_digest.clone();
883                if let Err(persist_err) = state.put_bug_monitor_draft(failed_draft.clone()).await {
884                    tracing::warn!(
885                        incident_id = %incident.incident_id,
886                        draft_id = %failed_draft.draft_id,
887                        error = %persist_err,
888                        "failed to persist bug monitor draft after auto-post failure",
889                    );
890                }
891                if let Err(record_err) = crate::bug_monitor_github::record_post_failure(
892                    state,
893                    &failed_draft,
894                    Some(&incident.incident_id),
895                    "auto_post",
896                    evidence_digest.as_deref(),
897                    &detail,
898                )
899                .await
900                {
901                    tracing::warn!(
902                        incident_id = %incident.incident_id,
903                        draft_id = %failed_draft.draft_id,
904                        error = %record_err,
905                        "failed to record bug monitor post failure",
906                    );
907                }
908            }
909        }
910
911        if let Some(triage_run_id) = incident.triage_run_id.clone() {
912            if let Some(timeout_ms) = config.triage_timeout_ms {
913                spawn_triage_deadline_task(
914                    state.clone(),
915                    incident.incident_id.clone(),
916                    draft_id.clone(),
917                    triage_run_id,
918                    timeout_ms,
919                );
920            }
921        }
922    }
923
924    incident.updated_at_ms = crate::util::time::now_ms();
925    state.put_bug_monitor_incident(incident.clone()).await?;
926    state.event_bus.publish(EngineEvent::new(
927        "bug_monitor.incident.detected",
928        serde_json::json!({
929            "incident_id": incident.incident_id,
930            "fingerprint": incident.fingerprint,
931            "eventType": incident.event_type,
932            "draft_id": incident.draft_id,
933            "triage_run_id": incident.triage_run_id,
934            "status": incident.status,
935        }),
936    ));
937    Ok(incident)
938}
939pub fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
940    for key in keys {
941        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
942            let trimmed = value.trim();
943            if !trimmed.is_empty() {
944                return Some(trimmed.to_string());
945            }
946        }
947    }
948    None
949}
950
951fn get_path_value<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
952    if key.contains('.') {
953        let mut current = value;
954        for part in key.split('.') {
955            current = current.get(part)?;
956        }
957        Some(current)
958    } else {
959        value.get(key)
960    }
961}
962
963fn first_value<'a>(properties: &'a Value, keys: &[&str]) -> Option<&'a Value> {
964    keys.iter().find_map(|key| get_path_value(properties, key))
965}
966
967fn first_string_deep(properties: &Value, keys: &[&str]) -> Option<String> {
968    for key in keys {
969        if let Some(value) = get_path_value(properties, key) {
970            if let Some(text) = value
971                .as_str()
972                .map(str::trim)
973                .filter(|value| !value.is_empty())
974            {
975                return Some(text.to_string());
976            }
977            if value.is_number() || value.is_boolean() {
978                return Some(value.to_string());
979            }
980        }
981    }
982    None
983}
984
985fn first_u64(properties: &Value, keys: &[&str]) -> Option<u64> {
986    for key in keys {
987        if let Some(value) = get_path_value(properties, key) {
988            if let Some(number) = value.as_u64() {
989                return Some(number);
990            }
991            if let Some(text) = value.as_str() {
992                if let Ok(number) = text.trim().parse::<u64>() {
993                    return Some(number);
994                }
995            }
996        }
997    }
998    None
999}
1000
1001fn strings_from_value(value: Option<&Value>, max_items: usize) -> Vec<String> {
1002    let mut rows = match value {
1003        Some(Value::Array(items)) => items
1004            .iter()
1005            .filter_map(|item| {
1006                item.as_str()
1007                    .map(str::trim)
1008                    .filter(|text| !text.is_empty())
1009                    .map(ToString::to_string)
1010                    .or_else(|| {
1011                        if item.is_object() || item.is_array() {
1012                            Some(truncate_text(&sanitize_json_value(item).to_string(), 300))
1013                        } else {
1014                            None
1015                        }
1016                    })
1017            })
1018            .collect::<Vec<_>>(),
1019        Some(Value::String(text)) => text
1020            .lines()
1021            .map(str::trim)
1022            .filter(|text| !text.is_empty())
1023            .map(ToString::to_string)
1024            .collect::<Vec<_>>(),
1025        Some(value) if value.is_object() => {
1026            vec![truncate_text(&sanitize_json_value(value).to_string(), 300)]
1027        }
1028        _ => Vec::new(),
1029    };
1030    rows.truncate(max_items);
1031    rows
1032}
1033
1034fn redacted_key(key: &str) -> bool {
1035    let normalized = key.to_ascii_lowercase();
1036    normalized.contains("token")
1037        || normalized.contains("secret")
1038        || normalized.contains("password")
1039        || normalized.contains("credential")
1040        || normalized.contains("authorization")
1041        || normalized == "api_key"
1042        || normalized.ends_with("_key")
1043}
1044
1045fn sanitize_json_value(value: &Value) -> Value {
1046    match value {
1047        Value::Object(map) => Value::Object(
1048            map.iter()
1049                .map(|(key, value)| {
1050                    if redacted_key(key) {
1051                        (key.clone(), Value::String("[redacted]".to_string()))
1052                    } else {
1053                        (key.clone(), sanitize_json_value(value))
1054                    }
1055                })
1056                .collect::<Map<String, Value>>(),
1057        ),
1058        Value::Array(items) => {
1059            Value::Array(items.iter().take(40).map(sanitize_json_value).collect())
1060        }
1061        Value::String(text) => Value::String(truncate_text(text, 1_000)),
1062        _ => value.clone(),
1063    }
1064}
1065
1066fn field_line(label: &str, value: Option<String>) -> String {
1067    format!("{label}: {}", value.unwrap_or_default())
1068}
1069
1070pub async fn build_bug_monitor_submission_from_event(
1071    state: &AppState,
1072    config: &BugMonitorConfig,
1073    event: &EngineEvent,
1074) -> Result<BugMonitorSubmission> {
1075    let repo = config
1076        .repo
1077        .clone()
1078        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1079    let default_workspace_root = state.workspace_index.snapshot().await.root;
1080    let workspace_root = config
1081        .workspace_root
1082        .clone()
1083        .unwrap_or(default_workspace_root);
1084    let reason = first_string_deep(
1085        &event.properties,
1086        &[
1087            "reason",
1088            "error",
1089            "detail",
1090            "message",
1091            "summary",
1092            "task.last_error",
1093            "task.payload.reason",
1094            "task.payload.error",
1095        ],
1096    );
1097    let workflow_id = first_string_deep(&event.properties, &["workflow_id", "workflowID"]);
1098    let workflow_name = first_string_deep(&event.properties, &["workflow_name", "workflowName"]);
1099    let run_id = first_string_deep(&event.properties, &["run_id", "runID"]);
1100    let session_id = first_string_deep(&event.properties, &["session_id", "sessionID"]);
1101    let inferred_node_id = reason
1102        .as_deref()
1103        .and_then(node_id_from_failure_reason)
1104        .map(|value| truncate_text(&value, 160));
1105    let task_id = first_string_deep(&event.properties, &["task_id", "taskID", "task.id"])
1106        .or_else(|| inferred_node_id.clone());
1107    let stage_id = first_string_deep(&event.properties, &["stage_id", "stageID", "actionID"])
1108        .or_else(|| inferred_node_id.clone());
1109    let node_id = first_string_deep(&event.properties, &["node_id", "nodeID"])
1110        .or_else(|| inferred_node_id.clone());
1111    let automation_id = first_string_deep(&event.properties, &["automation_id", "automationID"]);
1112    let routine_id = first_string_deep(&event.properties, &["routine_id", "routineID"]);
1113    let agent_role = first_string_deep(&event.properties, &["agent_role", "agentRole"]);
1114    let error_kind = first_string_deep(
1115        &event.properties,
1116        &["error_kind", "errorKind", "failure_kind", "failureKind"],
1117    );
1118    let tool_name = first_string_deep(&event.properties, &["tool_name", "toolName", "tool"]);
1119    let suggested_next_action = first_string_deep(
1120        &event.properties,
1121        &["suggested_next_action", "suggestedNextAction"],
1122    );
1123    let expected_output =
1124        first_string_deep(&event.properties, &["expected_output", "expectedOutput"]).or_else(
1125            || {
1126                first_value(&event.properties, &["output_contract", "outputContract"])
1127                    .map(|value| truncate_text(&sanitize_json_value(value).to_string(), 800))
1128            },
1129        );
1130    let actual_output = first_string_deep(&event.properties, &["actual_output", "actualOutput"]);
1131    let tool_args_summary =
1132        first_value(&event.properties, &["tool_args_summary", "toolArgsSummary"])
1133            .map(|value| truncate_text(&sanitize_json_value(value).to_string(), 800));
1134    let tool_result_excerpt = first_string_deep(
1135        &event.properties,
1136        &["tool_result_excerpt", "toolResultExcerpt"],
1137    );
1138    let attempt = first_u64(&event.properties, &["attempt", "task.attempt"]);
1139    let max_attempts = first_u64(
1140        &event.properties,
1141        &["max_attempts", "maxAttempts", "task.max_attempts"],
1142    );
1143    let retry_exhausted = first_value(&event.properties, &["retry_exhausted", "retryExhausted"])
1144        .and_then(Value::as_bool)
1145        .unwrap_or_else(|| {
1146            attempt
1147                .zip(max_attempts)
1148                .map(|(attempt, max)| max > 0 && attempt >= max)
1149                .unwrap_or(false)
1150        });
1151    let files_touched = strings_from_value(
1152        first_value(
1153            &event.properties,
1154            &["files_touched", "filesTouched", "changed_files"],
1155        ),
1156        20,
1157    );
1158    let artifact_refs = strings_from_value(
1159        first_value(
1160            &event.properties,
1161            &["artifact_refs", "artifactRefs", "artifacts"],
1162        ),
1163        20,
1164    );
1165    let mut evidence_refs = artifact_refs.clone();
1166    for evidence_ref in strings_from_value(
1167        first_value(&event.properties, &["evidence_refs", "evidenceRefs"]),
1168        20,
1169    ) {
1170        if !evidence_refs
1171            .iter()
1172            .any(|existing| existing == &evidence_ref)
1173        {
1174            evidence_refs.push(evidence_ref);
1175        }
1176    }
1177    let validation_errors = strings_from_value(
1178        first_value(
1179            &event.properties,
1180            &["validation_errors", "validationErrors"],
1181        ),
1182        12,
1183    );
1184    let missing_workspace_files = strings_from_value(
1185        first_value(
1186            &event.properties,
1187            &["missing_workspace_files", "missingWorkspaceFiles"],
1188        ),
1189        20,
1190    );
1191    let required_next_tool_actions = strings_from_value(
1192        first_value(
1193            &event.properties,
1194            &["required_next_tool_actions", "requiredNextToolActions"],
1195        ),
1196        20,
1197    );
1198    let recent_attempt_evidence = strings_from_value(
1199        first_value(
1200            &event.properties,
1201            &[
1202                "recent_node_attempt_evidence",
1203                "recentNodeAttemptEvidence",
1204                "prior_attempt_evidence",
1205                "priorAttemptEvidence",
1206            ],
1207        ),
1208        12,
1209    );
1210    let correlation_id = first_string_deep(
1211        &event.properties,
1212        &[
1213            "correlationID",
1214            "correlation_id",
1215            "commandID",
1216            "command_id",
1217            "eventID",
1218        ],
1219    );
1220    let component = first_string_deep(
1221        &event.properties,
1222        &[
1223            "component",
1224            "routine_id",
1225            "routineID",
1226            "workflow_id",
1227            "workflowID",
1228            "automation_id",
1229            "automationID",
1230            "node_id",
1231            "nodeID",
1232            "stage_id",
1233            "task",
1234            "title",
1235        ],
1236    );
1237    let confidence = first_string_deep(
1238        &event.properties,
1239        &["confidence", "signal_confidence", "signalConfidence"],
1240    )
1241    .map(|value| truncate_text(&value, 80))
1242    .or_else(|| Some("high".to_string()));
1243    let risk_level = first_string_deep(&event.properties, &["risk_level", "riskLevel", "risk"])
1244        .map(|value| truncate_text(&value, 80))
1245        .or_else(|| Some("medium".to_string()));
1246    let expected_destination = first_string_deep(
1247        &event.properties,
1248        &["expected_destination", "expectedDestination"],
1249    )
1250    .map(|value| truncate_text(&value, 120))
1251    .or_else(|| Some("bug_monitor_issue_draft".to_string()));
1252    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
1253    if excerpt.is_empty() {
1254        if let Some(reason) = reason.as_ref() {
1255            excerpt.push(reason.clone());
1256        }
1257    }
1258    let sanitized_properties = sanitize_json_value(&event.properties);
1259    let serialized = serde_json::to_string(&sanitized_properties).unwrap_or_default();
1260    let normalized_reason = normalize_reason_for_fingerprint(reason.as_deref().unwrap_or(""));
1261    let mut fingerprint = sha256_hex(&[
1262        repo.as_str(),
1263        workspace_root.as_str(),
1264        event.event_type.as_str(),
1265        normalized_reason.as_str(),
1266        workflow_id.as_deref().unwrap_or(""),
1267        task_id.as_deref().unwrap_or(""),
1268        stage_id.as_deref().unwrap_or(""),
1269        node_id.as_deref().unwrap_or(""),
1270        // Excluded: `run_id`, `session_id`, `correlation_id`. Those
1271        // vary per workflow run, so including them in the hash made
1272        // every recurrence look like a brand-new incident — dedup
1273        // never matched, occurrence_count stayed at 1, and 13
1274        // recurrences of the same node failure produced 13 separate
1275        // GitHub issues in a single afternoon. The remaining inputs
1276        // (repo, workspace_root, event_type, reason, workflow_id,
1277        // task_id, stage_id, node_id, component) keep the fingerprint
1278        // structurally specific without the per-run noise.
1279        component.as_deref().unwrap_or(""),
1280    ]);
1281    if let Some(node_fingerprint) = existing_node_incident_fingerprint_for_aggregate_outcome(
1282        state,
1283        &repo,
1284        &workspace_root,
1285        &event.event_type,
1286        workflow_id.as_deref().or(automation_id.as_deref()),
1287        run_id.as_deref(),
1288        node_id
1289            .as_deref()
1290            .or(task_id.as_deref())
1291            .or(stage_id.as_deref()),
1292        reason.as_deref(),
1293    )
1294    .await
1295    {
1296        fingerprint = node_fingerprint;
1297    }
1298    let failure_place = stage_id
1299        .as_ref()
1300        .or(node_id.as_ref())
1301        .or(task_id.as_ref())
1302        .or(component.as_ref());
1303    let title_reason = reason
1304        .as_deref()
1305        .map(|row| truncate_text(row, 120))
1306        .unwrap_or_else(|| event.event_type.clone());
1307    let title = if let Some(workflow_id) = workflow_id.as_ref().or(automation_id.as_ref()) {
1308        if let Some(place) = failure_place {
1309            format!("Workflow {workflow_id} failed at {place}: {title_reason}")
1310        } else {
1311            format!("Workflow {workflow_id} failed: {title_reason}")
1312        }
1313    } else if let Some(routine_id) = routine_id.as_ref() {
1314        format!("Routine {routine_id} failed: {title_reason}")
1315    } else if let Some(component) = component.as_ref() {
1316        format!(
1317            "{} failure in {}: {}",
1318            event.event_type, component, title_reason
1319        )
1320    } else {
1321        format!("{}: {}", event.event_type, title_reason)
1322    };
1323    let mut detail_lines = vec![
1324        format!("event_type: {}", event.event_type),
1325        format!("repo: {}", repo),
1326        format!("workspace_root: {}", workspace_root),
1327        field_line("workflow_id", workflow_id.clone().or(automation_id.clone())),
1328        field_line("workflow_name", workflow_name.clone()),
1329        field_line("run_id", run_id.clone()),
1330        field_line("session_id", session_id.clone()),
1331        field_line("task_id", task_id.clone()),
1332        field_line("stage_id", stage_id.clone()),
1333        field_line("node_id", node_id.clone()),
1334        field_line("component", component.clone()),
1335        field_line("agent_role", agent_role.clone()),
1336        field_line("attempt", attempt.map(|value| value.to_string())),
1337        field_line("max_attempts", max_attempts.map(|value| value.to_string())),
1338        format!("retry_exhausted: {retry_exhausted}"),
1339        field_line("confidence", confidence.clone()),
1340        field_line("risk_level", risk_level.clone()),
1341        field_line("expected_destination", expected_destination.clone()),
1342        field_line("error_kind", error_kind.clone()),
1343        field_line("reason", reason.clone()),
1344        String::new(),
1345        "expected_output:".to_string(),
1346        expected_output.unwrap_or_default(),
1347        String::new(),
1348        "actual_output:".to_string(),
1349        actual_output.unwrap_or_default(),
1350        String::new(),
1351        field_line("tool", tool_name.clone()),
1352        "tool_args_summary:".to_string(),
1353        tool_args_summary.unwrap_or_default(),
1354        "tool_result_excerpt:".to_string(),
1355        tool_result_excerpt.unwrap_or_default(),
1356        String::new(),
1357        "artifact_refs:".to_string(),
1358        if artifact_refs.is_empty() {
1359            String::new()
1360        } else {
1361            artifact_refs.join("\n")
1362        },
1363        "files_touched:".to_string(),
1364        if files_touched.is_empty() {
1365            String::new()
1366        } else {
1367            files_touched.join("\n")
1368        },
1369        "validation_errors:".to_string(),
1370        if validation_errors.is_empty() {
1371            String::new()
1372        } else {
1373            validation_errors.join("\n")
1374        },
1375        "missing_workspace_files:".to_string(),
1376        if missing_workspace_files.is_empty() {
1377            String::new()
1378        } else {
1379            missing_workspace_files.join("\n")
1380        },
1381        "required_next_tool_actions:".to_string(),
1382        if required_next_tool_actions.is_empty() {
1383            String::new()
1384        } else {
1385            required_next_tool_actions.join("\n")
1386        },
1387        "recent_node_attempt_evidence:".to_string(),
1388        if recent_attempt_evidence.is_empty() {
1389            String::new()
1390        } else {
1391            recent_attempt_evidence.join("\n")
1392        },
1393        String::new(),
1394        "suggested_next_action:".to_string(),
1395        suggested_next_action.unwrap_or_default(),
1396    ];
1397    if !serialized.trim().is_empty() {
1398        detail_lines.push(String::new());
1399        detail_lines.push("payload:".to_string());
1400        detail_lines.push(truncate_text(&serialized, 4_000));
1401    }
1402
1403    Ok(BugMonitorSubmission {
1404        project_id: None,
1405        workspace_root: None,
1406        log_source_id: None,
1407        repo: Some(repo),
1408        title: Some(title),
1409        detail: Some(detail_lines.join("\n")),
1410        source: Some(
1411            first_string_deep(&event.properties, &["source"]).unwrap_or_else(|| {
1412                match event.event_type.as_str() {
1413                    "automation_v2.run.failed" | "automation.run.failed" => "automation_v2",
1414                    "automation_v2.run.paused_stale_no_provider_activity" => "automation_v2",
1415                    "workflow.run.failed" | "workflow.validation.failed" => "autonomous_workflow",
1416                    "routine.run.failed" => "routine",
1417                    "context.task.failed" | "context.task.blocked" | "context.run.failed" => {
1418                        "context_run"
1419                    }
1420                    "coder.run.failed" => "coder",
1421                    _ => "tandem_events",
1422                }
1423                .to_string()
1424            }),
1425        ),
1426        run_id,
1427        session_id,
1428        correlation_id,
1429        file_name: files_touched.first().cloned(),
1430        process: Some("tandem-engine".to_string()),
1431        component,
1432        event: Some(event.event_type.clone()),
1433        level: Some("error".to_string()),
1434        excerpt,
1435        fingerprint: Some(fingerprint),
1436        confidence,
1437        risk_level,
1438        expected_destination,
1439        evidence_refs,
1440    })
1441}
1442
1443/// Spawns a deadline task that fires after `timeout_ms`. If the triage
1444/// run reached a terminal status, the task first tries to finalize and
1445/// auto-publish the completed triage. Otherwise, or if finalization
1446/// cannot handle the terminal run, it marks the draft as
1447/// `triage_timed_out`, persists it, then re-runs `publish_draft` in
1448/// `Auto` mode. The triage_run_id is preserved on the draft so the UI
1449/// can still link to the abandoned run.
1450fn spawn_triage_deadline_task(
1451    state: AppState,
1452    incident_id: String,
1453    draft_id: String,
1454    triage_run_id: String,
1455    timeout_ms: u64,
1456) {
1457    tokio::spawn(async move {
1458        if timeout_ms > 0 {
1459            tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
1460        }
1461        if crate::http::bug_monitor::bug_monitor_triage_run_is_terminal(&state, &triage_run_id)
1462            .await
1463        {
1464            match crate::http::bug_monitor::finalize_completed_bug_monitor_triage(&state, &draft_id)
1465                .await
1466            {
1467                Ok(true) => return,
1468                Ok(false) => {}
1469                Err(error) => {
1470                    tracing::warn!(
1471                        incident_id = %incident_id,
1472                        draft_id = %draft_id,
1473                        triage_run_id = %triage_run_id,
1474                        error = %error,
1475                        "failed to finalize terminal Bug Monitor triage run at deadline",
1476                    );
1477                }
1478            }
1479        }
1480        let now = crate::util::time::now_ms();
1481        let Some(mut draft) = state.get_bug_monitor_draft(&draft_id).await else {
1482            return;
1483        };
1484        let already_marked = draft_is_triage_timed_out(&draft);
1485        if draft_has_github_issue(&draft) {
1486            return;
1487        }
1488        let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
1489            &state,
1490            &triage_run_id,
1491            timeout_ms,
1492        )
1493        .await;
1494        let last_post_error = compose_triage_timeout_last_post_error(
1495            &triage_run_id,
1496            timeout_ms,
1497            diagnostics_value.as_ref(),
1498        );
1499        if !already_marked {
1500            draft.github_status = Some("triage_timed_out".to_string());
1501            draft.last_post_error = Some(last_post_error.clone());
1502            if let Err(error) = state.put_bug_monitor_draft(draft.clone()).await {
1503                tracing::warn!(
1504                    incident_id = %incident_id,
1505                    draft_id = %draft_id,
1506                    error = %error,
1507                    "failed to persist bug monitor draft after triage deadline",
1508                );
1509                return;
1510            }
1511        }
1512        if let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await {
1513            incident.status = "triage_timed_out".to_string();
1514            incident.last_error = Some(
1515                draft
1516                    .last_post_error
1517                    .clone()
1518                    .unwrap_or(last_post_error.clone()),
1519            );
1520            incident.updated_at_ms = now;
1521            if let Err(error) = state.put_bug_monitor_incident(incident.clone()).await {
1522                tracing::warn!(
1523                    incident_id = %incident_id,
1524                    error = %error,
1525                    "failed to persist bug monitor incident after triage deadline",
1526                );
1527            }
1528            if !already_marked {
1529                let mut event_payload = serde_json::json!({
1530                    "incident_id": incident_id,
1531                    "draft_id": draft_id,
1532                    "triage_run_id": triage_run_id,
1533                    "timeout_ms": timeout_ms,
1534                });
1535                if let Some(diagnostics) = diagnostics_value.as_ref() {
1536                    if let Some(obj) = event_payload.as_object_mut() {
1537                        obj.insert("diagnostics".to_string(), diagnostics.clone());
1538                    }
1539                }
1540                state.event_bus.publish(EngineEvent::new(
1541                    "bug_monitor.incident.triage_timed_out",
1542                    event_payload,
1543                ));
1544            }
1545        }
1546        if let Err(error) = crate::bug_monitor_github::publish_draft(
1547            &state,
1548            &draft_id,
1549            Some(&incident_id),
1550            crate::bug_monitor_github::PublishMode::Recovery,
1551        )
1552        .await
1553        {
1554            tracing::warn!(
1555                incident_id = %incident_id,
1556                draft_id = %draft_id,
1557                triage_run_id = %triage_run_id,
1558                error = %error,
1559                "fallback publish after triage deadline failed",
1560            );
1561        }
1562    });
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567    use super::*;
1568    use serde_json::json;
1569
1570    fn event_with(properties: Value) -> EngineEvent {
1571        EngineEvent::new("automation_v2.run.failed", properties)
1572    }
1573
1574    #[test]
1575    fn recursive_triage_skip_reason_detects_triage_automation_id_prefix() {
1576        let event = event_with(json!({
1577            "automation_id": "automation-v2-bug-monitor-triage-failure-draft-abc123",
1578            "agent_role": "agent_writer",
1579        }));
1580        let reason = recursive_triage_skip_reason(&event)
1581            .expect("triage automation_id prefix should trigger skip");
1582        assert!(reason.contains("automation-v2-bug-monitor-triage-"));
1583    }
1584
1585    #[test]
1586    fn recursive_triage_skip_reason_detects_workflow_id_alias() {
1587        // Some events use `workflow_id` instead of `automation_id`.
1588        let event = event_with(json!({
1589            "workflow_id": "automation-v2-bug-monitor-triage-failure-draft-xyz",
1590        }));
1591        assert!(recursive_triage_skip_reason(&event).is_some());
1592    }
1593
1594    #[test]
1595    fn recursive_triage_skip_reason_detects_triage_agent_role_when_id_missing() {
1596        let event = event_with(json!({
1597            "agent_role": "bug_monitor_triage_agent",
1598        }));
1599        let reason =
1600            recursive_triage_skip_reason(&event).expect("triage agent_role should trigger skip");
1601        assert!(reason.contains("bug_monitor_triage_agent"));
1602    }
1603
1604    #[test]
1605    fn recursive_triage_skip_reason_passes_normal_workflow_failures() {
1606        let event = event_with(json!({
1607            "automation_id": "automation-v2-9ee33834-bf6d-4f86-acb3-3cd41d9cef19",
1608            "agent_role": "agent_reddit_query_researcher",
1609        }));
1610        assert!(recursive_triage_skip_reason(&event).is_none());
1611    }
1612
1613    /// Regression for the P2 Codex review on PR #53. If a user's
1614    /// custom automation happens to use `bug_monitor_triage_agent`
1615    /// as its agent_id string, the agent_role backstop must NOT
1616    /// silently filter out its failures — the automation_id is
1617    /// present and doesn't have the triage prefix, so this is a
1618    /// real workflow failure and should be triaged normally.
1619    #[test]
1620    fn recursive_triage_skip_reason_does_not_fire_when_automation_id_is_real() {
1621        let event = event_with(json!({
1622            "automation_id": "automation-v2-9ee33834-bf6d-4f86-acb3-3cd41d9cef19",
1623            "agent_role": "bug_monitor_triage_agent",
1624        }));
1625        assert!(recursive_triage_skip_reason(&event).is_none());
1626    }
1627
1628    #[test]
1629    fn recursive_triage_skip_reason_handles_empty_properties() {
1630        let event = event_with(json!({}));
1631        assert!(recursive_triage_skip_reason(&event).is_none());
1632    }
1633
1634    #[test]
1635    fn normalize_reason_replaces_automation_run_id_in_artifact_path() {
1636        let reason = "required output `.tandem/runs/automation-v2-run-593051dc-78bf-4927-b7db-b831b81d8bdd/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1637        let normalized = normalize_reason_for_fingerprint(reason);
1638        assert!(
1639            normalized.contains("automation-v2-run-RUNID"),
1640            "expected RUNID placeholder, got: {normalized}"
1641        );
1642        assert!(
1643            !normalized.contains("593051dc"),
1644            "leftover run uuid: {normalized}"
1645        );
1646    }
1647
1648    #[test]
1649    fn normalize_reason_collapses_recurrences_to_same_fingerprint() {
1650        // Two reason strings from successive runs of the same node
1651        // failure — only the embedded run UUID differs.
1652        let r1 = "required output `.tandem/runs/automation-v2-run-593051dc-78bf-4927-b7db-b831b81d8bdd/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1653        let r2 = "required output `.tandem/runs/automation-v2-run-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1654        assert_eq!(
1655            normalize_reason_for_fingerprint(r1),
1656            normalize_reason_for_fingerprint(r2),
1657        );
1658    }
1659
1660    #[test]
1661    fn normalize_reason_preserves_numeric_values() {
1662        // 180000 vs 600000 are genuinely different failure shapes —
1663        // do not collapse them.
1664        let r1 = "automation node `prepare_search_manifest` timed out after 180000 ms";
1665        let r2 = "automation node `prepare_search_manifest` timed out after 600000 ms";
1666        assert_ne!(
1667            normalize_reason_for_fingerprint(r1),
1668            normalize_reason_for_fingerprint(r2),
1669        );
1670    }
1671
1672    #[test]
1673    fn normalize_reason_replaces_bare_uuids() {
1674        let reason = "session 0251b4cc-14f3-48d1-8d81-a11c780c7d7c failed validation";
1675        let normalized = normalize_reason_for_fingerprint(reason);
1676        assert!(normalized.contains("UUID"), "got: {normalized}");
1677        assert!(
1678            !normalized.contains("0251b4cc"),
1679            "leftover uuid: {normalized}"
1680        );
1681    }
1682
1683    #[test]
1684    fn normalize_reason_is_idempotent_for_already_clean_text() {
1685        // Reasons without any UUID-shaped tokens should pass through
1686        // unchanged.
1687        let reason = "failed to reach provider `openai-codex` at https://chatgpt.com/backend-api/codex (request error)";
1688        assert_eq!(normalize_reason_for_fingerprint(reason), reason);
1689    }
1690
1691    #[test]
1692    fn node_id_from_failure_reason_extracts_node_outcome() {
1693        assert_eq!(
1694            node_id_from_failure_reason(
1695                "automation run failed from node outcomes: research_sources"
1696            )
1697            .as_deref(),
1698            Some("research_sources")
1699        );
1700    }
1701
1702    #[test]
1703    fn node_incident_matches_aggregate_outcome_only_for_concrete_node_failure() {
1704        let incident = BugMonitorIncidentRecord {
1705            fingerprint: "node-fingerprint".to_string(),
1706            repo: "frumu-ai/tandem".to_string(),
1707            workspace_root: "/workspace".to_string(),
1708            event_type: "automation_v2.run.failed".to_string(),
1709            run_id: Some("automation-v2-run-1".to_string()),
1710            updated_at_ms: 10,
1711            event_payload: Some(json!({
1712                "workflow_id": "automation-v2-workflow",
1713                "run_id": "automation-v2-run-1",
1714                "node_id": "research_sources",
1715                "reason": "required_workspace_files_missing",
1716            })),
1717            ..Default::default()
1718        };
1719        assert!(node_incident_matches_aggregate_outcome(
1720            &incident,
1721            "frumu-ai/tandem",
1722            "/workspace",
1723            "automation_v2.run.failed",
1724            "automation-v2-workflow",
1725            "automation-v2-run-1",
1726            "research_sources"
1727        ));
1728
1729        let aggregate_incident = BugMonitorIncidentRecord {
1730            event_payload: Some(json!({
1731                "workflow_id": "automation-v2-workflow",
1732                "run_id": "automation-v2-run-1",
1733                "node_id": "research_sources",
1734                "reason": "automation run failed from node outcomes: research_sources",
1735            })),
1736            ..incident.clone()
1737        };
1738        assert!(!node_incident_matches_aggregate_outcome(
1739            &aggregate_incident,
1740            "frumu-ai/tandem",
1741            "/workspace",
1742            "automation_v2.run.failed",
1743            "automation-v2-workflow",
1744            "automation-v2-run-1",
1745            "research_sources"
1746        ));
1747
1748        let wrong_node = BugMonitorIncidentRecord {
1749            event_payload: Some(json!({
1750                "workflow_id": "automation-v2-workflow",
1751                "run_id": "automation-v2-run-1",
1752                "node_id": "generate_report",
1753                "reason": "required_workspace_files_missing",
1754            })),
1755            ..incident.clone()
1756        };
1757        assert!(!node_incident_matches_aggregate_outcome(
1758            &wrong_node,
1759            "frumu-ai/tandem",
1760            "/workspace",
1761            "automation_v2.run.failed",
1762            "automation-v2-workflow",
1763            "automation-v2-run-1",
1764            "research_sources"
1765        ));
1766    }
1767
1768    #[tokio::test]
1769    async fn aggregate_node_outcome_lookup_reuses_existing_node_fingerprint() {
1770        let state = AppState::new_starting("bug-monitor-aggregate-merge-test".to_string(), true);
1771        let node_incident = BugMonitorIncidentRecord {
1772            incident_id: "incident-node".to_string(),
1773            fingerprint: "node-fingerprint".to_string(),
1774            repo: "frumu-ai/tandem".to_string(),
1775            workspace_root: "/workspace".to_string(),
1776            event_type: "automation_v2.run.failed".to_string(),
1777            status: "draft_created".to_string(),
1778            title: "Node failure".to_string(),
1779            run_id: Some("automation-v2-run-1".to_string()),
1780            updated_at_ms: 10,
1781            event_payload: Some(json!({
1782                "workflow_id": "automation-v2-workflow",
1783                "run_id": "automation-v2-run-1",
1784                "node_id": "research_sources",
1785                "reason": "required_workspace_files_missing",
1786            })),
1787            ..Default::default()
1788        };
1789        state
1790            .put_bug_monitor_incident(node_incident)
1791            .await
1792            .expect("store incident");
1793        let event = event_with(json!({
1794            "repo": "frumu-ai/tandem",
1795            "workspace_root": "/workspace",
1796            "workflow_id": "automation-v2-workflow",
1797            "run_id": "automation-v2-run-1",
1798            "reason": "automation run failed from node outcomes: research_sources",
1799            "component": "automation_v2",
1800        }));
1801        let reason = first_string_deep(&event.properties, &["reason"]);
1802        let node_id = reason.as_deref().and_then(node_id_from_failure_reason);
1803
1804        let fingerprint = existing_node_incident_fingerprint_for_aggregate_outcome(
1805            &state,
1806            "frumu-ai/tandem",
1807            "/workspace",
1808            &event.event_type,
1809            Some("automation-v2-workflow"),
1810            Some("automation-v2-run-1"),
1811            node_id.as_deref(),
1812            reason.as_deref(),
1813        )
1814        .await
1815        .expect("aggregate should reuse concrete node incident fingerprint");
1816
1817        assert_eq!(fingerprint, "node-fingerprint");
1818    }
1819
1820    #[test]
1821    fn node_id_from_failure_reason_extracts_timed_out_node() {
1822        assert_eq!(
1823            node_id_from_failure_reason(
1824                "automation node `prepare_search_manifest` timed out after 180000 ms"
1825            )
1826            .as_deref(),
1827            Some("prepare_search_manifest")
1828        );
1829    }
1830}