1use anyhow::Result;
2use serde_json::{Map, Value};
3
4use crate::bug_monitor::types::BugMonitorIncidentRecord;
5use crate::bug_monitor::types::{
6 BugMonitorConfig, BugMonitorQualityGateReport, BugMonitorQualityGateResult,
7 BugMonitorSubmission,
8};
9use crate::EngineEvent;
10use crate::{
11 app::state::{sha256_hex, truncate_text, AppState},
12 now_ms,
13};
14
15fn bug_monitor_triage_timeout_deadline_ms(created_at_ms: u64, timeout_ms: u64) -> u64 {
16 created_at_ms.saturating_add(timeout_ms)
17}
18
19const BUG_MONITOR_TRIAGE_AUTOMATION_PREFIX: &str = "automation-v2-bug-monitor-triage-";
20const BUG_MONITOR_TRIAGE_AGENT_ROLE: &str = "bug_monitor_triage_agent";
21
22fn normalize_reason_for_fingerprint(reason: &str) -> String {
35 let after_run_ids = automation_run_id_regex().replace_all(reason, "automation-v2-run-RUNID");
36 uuid_regex()
37 .replace_all(after_run_ids.as_ref(), "UUID")
38 .into_owned()
39}
40
41fn node_id_from_failure_reason(reason: &str) -> Option<String> {
42 for regex in [
43 node_outcomes_reason_regex(),
44 automation_node_timeout_reason_regex(),
45 ] {
46 if let Some(captures) = regex.captures(reason) {
47 let value = captures
48 .get(1)
49 .map(|match_| match_.as_str())
50 .unwrap_or_default()
51 .trim()
52 .trim_matches('`')
53 .trim();
54 if !value.is_empty() {
55 return Some(value.to_string());
56 }
57 }
58 }
59 None
60}
61
62fn is_node_outcomes_reason(reason: &str) -> bool {
63 node_outcomes_reason_regex().is_match(reason)
64}
65
66fn node_outcomes_reason_regex() -> &'static regex::Regex {
67 static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
68 REGEX.get_or_init(|| {
69 regex::Regex::new(r"(?i)node outcomes:\s*`?([A-Za-z0-9_.:-]+)`?")
70 .expect("node outcomes reason regex")
71 })
72}
73
74fn node_incident_matches_aggregate_outcome(
75 incident: &BugMonitorIncidentRecord,
76 repo: &str,
77 workspace_root: &str,
78 event_type: &str,
79 workflow_id: &str,
80 run_id: &str,
81 node_id: &str,
82) -> bool {
83 if incident.repo != repo
84 || incident.workspace_root != workspace_root
85 || incident.event_type != event_type
86 || incident.run_id.as_deref() != Some(run_id)
87 || incident.fingerprint.trim().is_empty()
88 {
89 return false;
90 }
91 let Some(payload) = incident.event_payload.as_ref() else {
92 return false;
93 };
94 let incident_workflow_id = first_string_deep(
95 payload,
96 &["workflow_id", "workflowID", "automation_id", "automationID"],
97 );
98 if incident_workflow_id.as_deref() != Some(workflow_id) {
99 return false;
100 }
101 let incident_node_id = first_string_deep(
102 payload,
103 &[
104 "node_id", "nodeID", "task_id", "taskID", "stage_id", "stageID",
105 ],
106 );
107 if incident_node_id.as_deref() != Some(node_id) {
108 return false;
109 }
110 let incident_reason = first_string_deep(payload, &["reason", "error", "message"]);
111 !incident_reason
112 .as_deref()
113 .is_some_and(is_node_outcomes_reason)
114}
115
116async fn existing_node_incident_fingerprint_for_aggregate_outcome(
117 state: &AppState,
118 repo: &str,
119 workspace_root: &str,
120 event_type: &str,
121 workflow_id: Option<&str>,
122 run_id: Option<&str>,
123 node_id: Option<&str>,
124 reason: Option<&str>,
125) -> Option<String> {
126 if !reason.is_some_and(is_node_outcomes_reason) {
127 return None;
128 }
129 let workflow_id = workflow_id?;
130 let run_id = run_id?;
131 let node_id = node_id?;
132 let mut rows = state
133 .bug_monitor_incidents
134 .read()
135 .await
136 .values()
137 .filter(|incident| {
138 node_incident_matches_aggregate_outcome(
139 incident,
140 repo,
141 workspace_root,
142 event_type,
143 workflow_id,
144 run_id,
145 node_id,
146 )
147 })
148 .cloned()
149 .collect::<Vec<_>>();
150 rows.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms));
151 rows.into_iter().next().map(|incident| incident.fingerprint)
152}
153
154fn automation_node_timeout_reason_regex() -> &'static regex::Regex {
155 static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
156 REGEX.get_or_init(|| {
157 regex::Regex::new(r"(?i)automation node\s+`([^`]+)`\s+timed out")
158 .expect("automation node timeout reason regex")
159 })
160}
161
162fn automation_run_id_regex() -> &'static regex::Regex {
163 static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
164 REGEX.get_or_init(|| {
165 regex::Regex::new(
166 r"automation-v2-run-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}",
167 )
168 .expect("automation run id regex")
169 })
170}
171
172fn uuid_regex() -> &'static regex::Regex {
173 static REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
174 REGEX.get_or_init(|| {
175 regex::Regex::new(r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
176 .expect("uuid regex")
177 })
178}
179
180fn recursive_triage_skip_reason(event: &EngineEvent) -> Option<String> {
193 let automation_id = first_string_deep(
194 &event.properties,
195 &["automation_id", "automationID", "workflow_id", "workflowID"],
196 );
197 if let Some(id) = automation_id.as_deref() {
198 if id.starts_with(BUG_MONITOR_TRIAGE_AUTOMATION_PREFIX) {
199 return Some(format!(
200 "automation_id={id} originates from bug monitor triage"
201 ));
202 }
203 return None;
208 }
209 let agent_role = first_string_deep(&event.properties, &["agent_role", "agentRole"]);
210 if agent_role
211 .as_deref()
212 .is_some_and(|role| role.eq_ignore_ascii_case(BUG_MONITOR_TRIAGE_AGENT_ROLE))
213 {
214 return Some(format!(
215 "agent_role={} is the bug monitor triage agent",
216 agent_role.unwrap_or_default()
217 ));
218 }
219 None
220}
221
222fn compose_triage_timeout_last_post_error(
231 triage_run_id: &str,
232 timeout_ms: u64,
233 diagnostics: Option<&serde_json::Value>,
234) -> String {
235 let head =
236 format!("triage run {triage_run_id} did not reach a terminal status within {timeout_ms}ms");
237 match diagnostics {
238 Some(value) => {
239 let detail =
240 crate::http::context_runs::format_bug_monitor_triage_timeout_diagnostics(value);
241 if detail.trim().is_empty() {
242 head
243 } else {
244 format!("{head}\n{detail}")
245 }
246 }
247 None => head,
248 }
249}
250
251fn draft_has_github_issue(draft: &crate::BugMonitorDraftRecord) -> bool {
252 draft.issue_number.is_some() || draft.github_issue_url.is_some()
253}
254
255fn draft_is_triage_timed_out(draft: &crate::BugMonitorDraftRecord) -> bool {
256 draft
257 .github_status
258 .as_deref()
259 .is_some_and(|status| status.eq_ignore_ascii_case("triage_timed_out"))
260}
261
262async fn bug_monitor_incident_for_draft(
263 state: &AppState,
264 draft_id: &str,
265 triage_run_id: &str,
266) -> Option<String> {
267 let incidents = state.bug_monitor_incidents.read().await;
268 incidents
269 .values()
270 .find(|incident| {
271 incident.draft_id.as_deref() == Some(draft_id)
272 || incident.triage_run_id.as_deref() == Some(triage_run_id)
273 })
274 .map(|incident| incident.incident_id.clone())
275}
276
277pub async fn recover_overdue_bug_monitor_triage_runs(
278 state: &AppState,
279) -> anyhow::Result<Vec<(String, Option<String>)>> {
280 let config = state.bug_monitor_config().await;
281 let Some(timeout_ms) = config.triage_timeout_ms else {
282 return Ok(Vec::new());
283 };
284 if !config.enabled || config.paused {
285 return Ok(Vec::new());
286 }
287
288 let now = now_ms();
289 let drafts = {
290 let guard = state.bug_monitor_drafts.read().await;
291 guard.values().cloned().collect::<Vec<_>>()
292 };
293
294 let mut recovered = Vec::new();
295 for draft in drafts {
296 let Some(triage_run_id) = draft.triage_run_id.clone() else {
297 continue;
298 };
299 if draft_has_github_issue(&draft) {
300 continue;
301 }
302 if draft_is_triage_timed_out(&draft) {
303 let incident_id =
304 bug_monitor_incident_for_draft(state, &draft.draft_id, &triage_run_id).await;
305 recovered.push((draft.draft_id.clone(), incident_id));
306 continue;
307 }
308 match crate::http::bug_monitor::finalize_completed_bug_monitor_triage(
309 state,
310 &draft.draft_id,
311 )
312 .await
313 {
314 Ok(true) => continue,
315 Ok(false) => {}
316 Err(error) => {
317 tracing::warn!(
318 draft_id = %draft.draft_id,
319 triage_run_id = %triage_run_id,
320 error = %error,
321 "failed to finalize completed Bug Monitor triage during recovery scan",
322 );
323 }
324 }
325
326 let run_created_at_ms =
327 crate::http::bug_monitor::bug_monitor_triage_effective_started_at_ms(
328 state,
329 &triage_run_id,
330 )
331 .await
332 .unwrap_or(draft.created_at_ms);
333 if now < bug_monitor_triage_timeout_deadline_ms(run_created_at_ms, timeout_ms) {
334 continue;
335 }
336
337 let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
338 state,
339 &triage_run_id,
340 timeout_ms,
341 )
342 .await;
343 let last_post_error = compose_triage_timeout_last_post_error(
344 &triage_run_id,
345 timeout_ms,
346 diagnostics_value.as_ref(),
347 );
348 let Some(current_draft) = state
359 .try_mark_triage_timed_out(&draft.draft_id, last_post_error.clone())
360 .await?
361 else {
362 continue;
363 };
364
365 let incident_id =
366 bug_monitor_incident_for_draft(state, ¤t_draft.draft_id, &triage_run_id).await;
367 if let Some(incident_id) = incident_id.as_deref() {
368 if let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await {
369 incident.status = "triage_timed_out".to_string();
370 incident.last_error = Some(last_post_error.clone());
371 incident.updated_at_ms = now;
372 state.put_bug_monitor_incident(incident.clone()).await?;
373 let mut event_payload = serde_json::json!({
374 "incident_id": incident.incident_id,
375 "draft_id": current_draft.draft_id,
376 "triage_run_id": triage_run_id,
377 "timeout_ms": timeout_ms,
378 });
379 if let Some(diagnostics) = diagnostics_value.as_ref() {
380 if let Some(obj) = event_payload.as_object_mut() {
381 obj.insert("diagnostics".to_string(), diagnostics.clone());
382 }
383 }
384 state.event_bus.publish(EngineEvent::new(
385 "bug_monitor.incident.triage_timed_out",
386 event_payload,
387 ));
388 }
389 }
390
391 recovered.push((current_draft.draft_id.clone(), incident_id));
392 }
393
394 Ok(recovered)
395}
396
397async fn recover_stale_bug_monitor_triage_event(
398 state: &AppState,
399 event: &EngineEvent,
400) -> anyhow::Result<Option<BugMonitorIncidentRecord>> {
401 if event.event_type != "automation_v2.run.paused_stale_no_provider_activity" {
402 return Ok(None);
403 }
404 let Some(triage_run_id) = first_string_deep(&event.properties, &["run_id", "runID"]) else {
405 return Ok(None);
406 };
407 let Some(draft) = ({
408 let guard = state.bug_monitor_drafts.read().await;
409 guard
410 .values()
411 .find(|draft| draft.triage_run_id.as_deref() == Some(triage_run_id.as_str()))
412 .cloned()
413 }) else {
414 return Ok(None);
415 };
416 if draft_has_github_issue(&draft) {
417 return Ok(None);
418 }
419
420 let timeout_ms = state
421 .bug_monitor_config()
422 .await
423 .triage_timeout_ms
424 .or_else(|| first_u64(&event.properties, &["stale_after_ms", "staleAfterMs"]))
425 .unwrap_or_default();
426 let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
427 state,
428 &triage_run_id,
429 timeout_ms,
430 )
431 .await;
432 let last_post_error = compose_triage_timeout_last_post_error(
433 &triage_run_id,
434 timeout_ms,
435 diagnostics_value.as_ref(),
436 );
437 let marked_now = match state
438 .try_mark_triage_timed_out(&draft.draft_id, last_post_error.clone())
439 .await?
440 {
441 Some(current_draft) => Some(current_draft),
442 None => {
443 let Some(current_draft) = state.get_bug_monitor_draft(&draft.draft_id).await else {
444 return Ok(None);
445 };
446 if draft_has_github_issue(¤t_draft) || !draft_is_triage_timed_out(¤t_draft)
447 {
448 return Ok(None);
449 }
450 Some(current_draft)
451 }
452 };
453 let Some(current_draft) = marked_now else {
454 return Ok(None);
455 };
456
457 let incident_id =
458 bug_monitor_incident_for_draft(state, ¤t_draft.draft_id, &triage_run_id).await;
459 let Some(incident_id) = incident_id else {
460 return Ok(None);
461 };
462 let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await else {
463 return Ok(None);
464 };
465 let now = now_ms();
466 incident.status = "triage_timed_out".to_string();
467 incident.last_error = Some(
468 current_draft
469 .last_post_error
470 .clone()
471 .unwrap_or(last_post_error.clone()),
472 );
473 incident.updated_at_ms = now;
474 state.put_bug_monitor_incident(incident.clone()).await?;
475
476 if !draft_is_triage_timed_out(&draft) {
477 let mut event_payload = serde_json::json!({
478 "incident_id": incident.incident_id,
479 "draft_id": current_draft.draft_id,
480 "triage_run_id": triage_run_id,
481 "timeout_ms": timeout_ms,
482 "reason": "bug monitor triage automation paused after no provider activity",
483 });
484 if let Some(diagnostics) = diagnostics_value.as_ref() {
485 if let Some(obj) = event_payload.as_object_mut() {
486 obj.insert("diagnostics".to_string(), diagnostics.clone());
487 }
488 }
489 state.event_bus.publish(EngineEvent::new(
490 "bug_monitor.incident.triage_timed_out",
491 event_payload,
492 ));
493 }
494
495 match crate::bug_monitor_github::publish_draft(
496 state,
497 ¤t_draft.draft_id,
498 Some(&incident.incident_id),
499 crate::bug_monitor_github::PublishMode::Recovery,
500 )
501 .await
502 {
503 Ok(outcome) => {
504 incident.status = outcome.action;
505 incident.last_error = None;
506 }
507 Err(error) => {
508 incident.last_error = Some(truncate_text(&error.to_string(), 500));
509 }
510 }
511 incident.updated_at_ms = now_ms();
512 state.put_bug_monitor_incident(incident.clone()).await?;
513 Ok(Some(incident))
514}
515
516pub async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
517 let mut excerpt = Vec::new();
518 if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
519 excerpt.push(reason);
520 }
521 if let Some(title) = first_string(properties, &["title", "task"]) {
522 if !excerpt.iter().any(|row| row == &title) {
523 excerpt.push(title);
524 }
525 }
526 let logs = state.logs.read().await;
527 for entry in logs.iter().rev().take(3) {
528 if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
529 excerpt.push(truncate_text(message, 240));
530 }
531 }
532 excerpt.truncate(8);
533 excerpt
534}
535
536fn is_non_empty(value: &Option<String>) -> bool {
537 value
538 .as_deref()
539 .map(str::trim)
540 .is_some_and(|value| !value.is_empty())
541}
542
543fn event_is_routine_noise(event: Option<&str>) -> bool {
544 let normalized = event.unwrap_or_default().trim().to_ascii_lowercase();
545 !normalized.is_empty()
546 && [
547 "progress",
548 "heartbeat",
549 "started",
550 "queued",
551 "retrying",
552 "attempt.started",
553 "minor_retry",
554 ]
555 .iter()
556 .any(|term| normalized.contains(term))
557}
558
559pub fn evaluate_bug_monitor_submission_quality(
560 submission: &BugMonitorSubmission,
561) -> BugMonitorQualityGateReport {
562 let source_known = is_non_empty(&submission.source)
563 || is_non_empty(&submission.component)
564 || is_non_empty(&submission.process)
565 || is_non_empty(&submission.event);
566 let type_classified = is_non_empty(&submission.event) || is_non_empty(&submission.level);
567 let confidence_recorded = is_non_empty(&submission.confidence);
568 let dedupe_checked = is_non_empty(&submission.fingerprint);
569 let evidence_exists = !submission.evidence_refs.is_empty()
570 || !submission.excerpt.is_empty()
571 || is_non_empty(&submission.detail)
572 || is_non_empty(&submission.file_name);
573 let destination_clear = is_non_empty(&submission.expected_destination);
574 let risk_known = is_non_empty(&submission.risk_level);
575 let not_routine_noise = !event_is_routine_noise(submission.event.as_deref());
576
577 let gate_specs = [
578 (
579 "source_known",
580 "Source known",
581 source_known,
582 submission
583 .source
584 .clone()
585 .or_else(|| submission.component.clone())
586 .or_else(|| submission.process.clone())
587 .or_else(|| submission.event.clone()),
588 ),
589 (
590 "type_classified",
591 "Signal type classified",
592 type_classified,
593 submission
594 .event
595 .clone()
596 .or_else(|| submission.level.clone()),
597 ),
598 (
599 "confidence_recorded",
600 "Confidence recorded",
601 confidence_recorded,
602 submission.confidence.clone(),
603 ),
604 (
605 "dedupe_checked",
606 "Dedupe/fingerprint checked",
607 dedupe_checked,
608 submission.fingerprint.clone(),
609 ),
610 (
611 "evidence_present",
612 "Evidence or artifact refs present",
613 evidence_exists,
614 submission
615 .evidence_refs
616 .first()
617 .cloned()
618 .or_else(|| submission.excerpt.first().cloned())
619 .or_else(|| submission.file_name.clone()),
620 ),
621 (
622 "destination_clear",
623 "Expected destination clear",
624 destination_clear,
625 submission.expected_destination.clone(),
626 ),
627 (
628 "risk_known",
629 "Risk level known",
630 risk_known,
631 submission.risk_level.clone(),
632 ),
633 (
634 "not_routine_noise",
635 "Not routine progress or minor retry",
636 not_routine_noise,
637 submission.event.clone(),
638 ),
639 ];
640
641 let gates = gate_specs
642 .into_iter()
643 .map(|(key, label, passed, detail)| BugMonitorQualityGateResult {
644 key: key.to_string(),
645 label: label.to_string(),
646 passed,
647 detail,
648 })
649 .collect::<Vec<_>>();
650 let passed_count = gates.iter().filter(|gate| gate.passed).count();
651 let missing = gates
652 .iter()
653 .filter(|gate| !gate.passed)
654 .map(|gate| gate.key.clone())
655 .collect::<Vec<_>>();
656 let passed = passed_count == gates.len();
657 BugMonitorQualityGateReport {
658 stage: "intake_to_draft".to_string(),
659 status: if passed { "passed" } else { "blocked" }.to_string(),
660 passed,
661 passed_count,
662 total_count: gates.len(),
663 blocked_reason: if passed {
664 None
665 } else {
666 Some(format!("missing quality gates: {}", missing.join(", ")))
667 },
668 gates,
669 missing,
670 }
671}
672
673pub async fn process_event(
674 state: &AppState,
675 event: &EngineEvent,
676 config: &BugMonitorConfig,
677) -> anyhow::Result<BugMonitorIncidentRecord> {
678 if let Some(reason) = recursive_triage_skip_reason(event) {
679 if let Some(incident) = recover_stale_bug_monitor_triage_event(state, event).await? {
680 return Ok(incident);
681 }
682 anyhow::bail!("skipping recursive bug monitor triage event: {reason}");
692 }
693 let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
694 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
695 state,
696 submission.repo.as_deref().unwrap_or_default(),
697 submission.fingerprint.as_deref().unwrap_or_default(),
698 submission.title.as_deref(),
699 submission.detail.as_deref(),
700 &submission.excerpt,
701 3,
702 )
703 .await;
704 let fingerprint = submission
705 .fingerprint
706 .clone()
707 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
708 let default_workspace_root = state.workspace_index.snapshot().await.root;
709 let workspace_root = config
710 .workspace_root
711 .clone()
712 .unwrap_or(default_workspace_root);
713 let now = crate::util::time::now_ms();
714 let quality_gate = evaluate_bug_monitor_submission_quality(&submission);
715
716 let existing = state
717 .bug_monitor_incidents
718 .read()
719 .await
720 .values()
721 .find(|row| row.fingerprint == fingerprint)
722 .cloned();
723
724 let mut incident = if let Some(mut row) = existing {
725 row.occurrence_count = row.occurrence_count.saturating_add(1);
726 row.updated_at_ms = now;
727 row.last_seen_at_ms = Some(now);
728 if row.excerpt.is_empty() {
729 row.excerpt = submission.excerpt.clone();
730 }
731 if row.confidence.is_none() {
732 row.confidence = submission.confidence.clone();
733 }
734 if row.risk_level.is_none() {
735 row.risk_level = submission.risk_level.clone();
736 }
737 if row.expected_destination.is_none() {
738 row.expected_destination = submission.expected_destination.clone();
739 }
740 row.quality_gate = Some(quality_gate.clone());
741 for evidence_ref in &submission.evidence_refs {
742 if !row
743 .evidence_refs
744 .iter()
745 .any(|existing| existing == evidence_ref)
746 {
747 row.evidence_refs.push(evidence_ref.clone());
748 }
749 }
750 row
751 } else {
752 BugMonitorIncidentRecord {
753 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
754 fingerprint: fingerprint.clone(),
755 event_type: event.event_type.clone(),
756 status: "queued".to_string(),
757 repo: submission.repo.clone().unwrap_or_default(),
758 workspace_root,
759 title: submission
760 .title
761 .clone()
762 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
763 detail: submission.detail.clone(),
764 excerpt: submission.excerpt.clone(),
765 source: submission.source.clone(),
766 run_id: submission.run_id.clone(),
767 session_id: submission.session_id.clone(),
768 correlation_id: submission.correlation_id.clone(),
769 component: submission.component.clone(),
770 level: submission.level.clone(),
771 occurrence_count: 1,
772 created_at_ms: now,
773 updated_at_ms: now,
774 last_seen_at_ms: Some(now),
775 draft_id: None,
776 triage_run_id: None,
777 last_error: None,
778 confidence: submission.confidence.clone(),
779 risk_level: submission.risk_level.clone(),
780 expected_destination: submission.expected_destination.clone(),
781 evidence_refs: submission.evidence_refs.clone(),
782 quality_gate: Some(quality_gate.clone()),
783 duplicate_summary: None,
784 duplicate_matches: None,
785 event_payload: Some(event.properties.clone()),
786 }
787 };
788 state.put_bug_monitor_incident(incident.clone()).await?;
789
790 if !duplicate_matches.is_empty() {
791 incident.status = "duplicate_suppressed".to_string();
792 let duplicate_summary =
793 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
794 incident.duplicate_summary = Some(duplicate_summary.clone());
795 incident.duplicate_matches = Some(duplicate_matches.clone());
796 incident.updated_at_ms = crate::util::time::now_ms();
797 state.put_bug_monitor_incident(incident.clone()).await?;
798 state.event_bus.publish(EngineEvent::new(
799 "bug_monitor.incident.duplicate_suppressed",
800 serde_json::json!({
801 "incident_id": incident.incident_id,
802 "fingerprint": incident.fingerprint,
803 "eventType": incident.event_type,
804 "status": incident.status,
805 "duplicate_summary": duplicate_summary,
806 "duplicate_matches": duplicate_matches,
807 }),
808 ));
809 return Ok(incident);
810 }
811
812 let draft = match state.submit_bug_monitor_draft(submission).await {
813 Ok(draft) => draft,
814 Err(error) => {
815 incident.status = "draft_failed".to_string();
816 incident.last_error = Some(truncate_text(&error.to_string(), 500));
817 incident.updated_at_ms = crate::util::time::now_ms();
818 state.put_bug_monitor_incident(incident.clone()).await?;
819 state.event_bus.publish(EngineEvent::new(
820 "bug_monitor.incident.detected",
821 serde_json::json!({
822 "incident_id": incident.incident_id,
823 "fingerprint": incident.fingerprint,
824 "eventType": incident.event_type,
825 "draft_id": incident.draft_id,
826 "triage_run_id": incident.triage_run_id,
827 "status": incident.status,
828 "detail": incident.last_error,
829 }),
830 ));
831 return Ok(incident);
832 }
833 };
834 incident.draft_id = Some(draft.draft_id.clone());
835 incident.status = "draft_created".to_string();
836 state.put_bug_monitor_incident(incident.clone()).await?;
837
838 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
839 state.clone(),
840 &draft.draft_id,
841 true,
842 )
843 .await
844 {
845 Ok((updated_draft, _run_id, _deduped)) => {
846 incident.triage_run_id = updated_draft.triage_run_id.clone();
847 if incident.triage_run_id.is_some() {
848 incident.status = "triage_queued".to_string();
849 }
850 incident.last_error = None;
851 }
852 Err(error) => {
853 incident.status = "draft_created".to_string();
854 incident.last_error = Some(truncate_text(&error.to_string(), 500));
855 }
856 }
857
858 if let Some(draft_id) = incident.draft_id.clone() {
859 let latest_draft = state
860 .get_bug_monitor_draft(&draft_id)
861 .await
862 .unwrap_or(draft.clone());
863 match crate::bug_monitor_github::publish_draft(
864 state,
865 &draft_id,
866 Some(&incident.incident_id),
867 crate::bug_monitor_github::PublishMode::Auto,
868 )
869 .await
870 {
871 Ok(outcome) => {
872 incident.status = outcome.action;
873 incident.last_error = None;
874 }
875 Err(error) => {
876 let detail = truncate_text(&error.to_string(), 500);
877 incident.last_error = Some(detail.clone());
878 let mut failed_draft = latest_draft;
879 failed_draft.status = "github_post_failed".to_string();
880 failed_draft.github_status = Some("github_post_failed".to_string());
881 failed_draft.last_post_error = Some(detail.clone());
882 let evidence_digest = failed_draft.evidence_digest.clone();
883 if let Err(persist_err) = state.put_bug_monitor_draft(failed_draft.clone()).await {
884 tracing::warn!(
885 incident_id = %incident.incident_id,
886 draft_id = %failed_draft.draft_id,
887 error = %persist_err,
888 "failed to persist bug monitor draft after auto-post failure",
889 );
890 }
891 if let Err(record_err) = crate::bug_monitor_github::record_post_failure(
892 state,
893 &failed_draft,
894 Some(&incident.incident_id),
895 "auto_post",
896 evidence_digest.as_deref(),
897 &detail,
898 )
899 .await
900 {
901 tracing::warn!(
902 incident_id = %incident.incident_id,
903 draft_id = %failed_draft.draft_id,
904 error = %record_err,
905 "failed to record bug monitor post failure",
906 );
907 }
908 }
909 }
910
911 if let Some(triage_run_id) = incident.triage_run_id.clone() {
912 if let Some(timeout_ms) = config.triage_timeout_ms {
913 spawn_triage_deadline_task(
914 state.clone(),
915 incident.incident_id.clone(),
916 draft_id.clone(),
917 triage_run_id,
918 timeout_ms,
919 );
920 }
921 }
922 }
923
924 incident.updated_at_ms = crate::util::time::now_ms();
925 state.put_bug_monitor_incident(incident.clone()).await?;
926 state.event_bus.publish(EngineEvent::new(
927 "bug_monitor.incident.detected",
928 serde_json::json!({
929 "incident_id": incident.incident_id,
930 "fingerprint": incident.fingerprint,
931 "eventType": incident.event_type,
932 "draft_id": incident.draft_id,
933 "triage_run_id": incident.triage_run_id,
934 "status": incident.status,
935 }),
936 ));
937 Ok(incident)
938}
939pub fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
940 for key in keys {
941 if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
942 let trimmed = value.trim();
943 if !trimmed.is_empty() {
944 return Some(trimmed.to_string());
945 }
946 }
947 }
948 None
949}
950
951fn get_path_value<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
952 if key.contains('.') {
953 let mut current = value;
954 for part in key.split('.') {
955 current = current.get(part)?;
956 }
957 Some(current)
958 } else {
959 value.get(key)
960 }
961}
962
963fn first_value<'a>(properties: &'a Value, keys: &[&str]) -> Option<&'a Value> {
964 keys.iter().find_map(|key| get_path_value(properties, key))
965}
966
967fn first_string_deep(properties: &Value, keys: &[&str]) -> Option<String> {
968 for key in keys {
969 if let Some(value) = get_path_value(properties, key) {
970 if let Some(text) = value
971 .as_str()
972 .map(str::trim)
973 .filter(|value| !value.is_empty())
974 {
975 return Some(text.to_string());
976 }
977 if value.is_number() || value.is_boolean() {
978 return Some(value.to_string());
979 }
980 }
981 }
982 None
983}
984
985fn first_u64(properties: &Value, keys: &[&str]) -> Option<u64> {
986 for key in keys {
987 if let Some(value) = get_path_value(properties, key) {
988 if let Some(number) = value.as_u64() {
989 return Some(number);
990 }
991 if let Some(text) = value.as_str() {
992 if let Ok(number) = text.trim().parse::<u64>() {
993 return Some(number);
994 }
995 }
996 }
997 }
998 None
999}
1000
1001fn strings_from_value(value: Option<&Value>, max_items: usize) -> Vec<String> {
1002 let mut rows = match value {
1003 Some(Value::Array(items)) => items
1004 .iter()
1005 .filter_map(|item| {
1006 item.as_str()
1007 .map(str::trim)
1008 .filter(|text| !text.is_empty())
1009 .map(ToString::to_string)
1010 .or_else(|| {
1011 if item.is_object() || item.is_array() {
1012 Some(truncate_text(&sanitize_json_value(item).to_string(), 300))
1013 } else {
1014 None
1015 }
1016 })
1017 })
1018 .collect::<Vec<_>>(),
1019 Some(Value::String(text)) => text
1020 .lines()
1021 .map(str::trim)
1022 .filter(|text| !text.is_empty())
1023 .map(ToString::to_string)
1024 .collect::<Vec<_>>(),
1025 Some(value) if value.is_object() => {
1026 vec![truncate_text(&sanitize_json_value(value).to_string(), 300)]
1027 }
1028 _ => Vec::new(),
1029 };
1030 rows.truncate(max_items);
1031 rows
1032}
1033
1034fn redacted_key(key: &str) -> bool {
1035 let normalized = key.to_ascii_lowercase();
1036 normalized.contains("token")
1037 || normalized.contains("secret")
1038 || normalized.contains("password")
1039 || normalized.contains("credential")
1040 || normalized.contains("authorization")
1041 || normalized == "api_key"
1042 || normalized.ends_with("_key")
1043}
1044
1045fn sanitize_json_value(value: &Value) -> Value {
1046 match value {
1047 Value::Object(map) => Value::Object(
1048 map.iter()
1049 .map(|(key, value)| {
1050 if redacted_key(key) {
1051 (key.clone(), Value::String("[redacted]".to_string()))
1052 } else {
1053 (key.clone(), sanitize_json_value(value))
1054 }
1055 })
1056 .collect::<Map<String, Value>>(),
1057 ),
1058 Value::Array(items) => {
1059 Value::Array(items.iter().take(40).map(sanitize_json_value).collect())
1060 }
1061 Value::String(text) => Value::String(truncate_text(text, 1_000)),
1062 _ => value.clone(),
1063 }
1064}
1065
1066fn field_line(label: &str, value: Option<String>) -> String {
1067 format!("{label}: {}", value.unwrap_or_default())
1068}
1069
1070pub async fn build_bug_monitor_submission_from_event(
1071 state: &AppState,
1072 config: &BugMonitorConfig,
1073 event: &EngineEvent,
1074) -> Result<BugMonitorSubmission> {
1075 let repo = config
1076 .repo
1077 .clone()
1078 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
1079 let default_workspace_root = state.workspace_index.snapshot().await.root;
1080 let workspace_root = config
1081 .workspace_root
1082 .clone()
1083 .unwrap_or(default_workspace_root);
1084 let reason = first_string_deep(
1085 &event.properties,
1086 &[
1087 "reason",
1088 "error",
1089 "detail",
1090 "message",
1091 "summary",
1092 "task.last_error",
1093 "task.payload.reason",
1094 "task.payload.error",
1095 ],
1096 );
1097 let workflow_id = first_string_deep(&event.properties, &["workflow_id", "workflowID"]);
1098 let workflow_name = first_string_deep(&event.properties, &["workflow_name", "workflowName"]);
1099 let run_id = first_string_deep(&event.properties, &["run_id", "runID"]);
1100 let session_id = first_string_deep(&event.properties, &["session_id", "sessionID"]);
1101 let inferred_node_id = reason
1102 .as_deref()
1103 .and_then(node_id_from_failure_reason)
1104 .map(|value| truncate_text(&value, 160));
1105 let task_id = first_string_deep(&event.properties, &["task_id", "taskID", "task.id"])
1106 .or_else(|| inferred_node_id.clone());
1107 let stage_id = first_string_deep(&event.properties, &["stage_id", "stageID", "actionID"])
1108 .or_else(|| inferred_node_id.clone());
1109 let node_id = first_string_deep(&event.properties, &["node_id", "nodeID"])
1110 .or_else(|| inferred_node_id.clone());
1111 let automation_id = first_string_deep(&event.properties, &["automation_id", "automationID"]);
1112 let routine_id = first_string_deep(&event.properties, &["routine_id", "routineID"]);
1113 let agent_role = first_string_deep(&event.properties, &["agent_role", "agentRole"]);
1114 let error_kind = first_string_deep(
1115 &event.properties,
1116 &["error_kind", "errorKind", "failure_kind", "failureKind"],
1117 );
1118 let tool_name = first_string_deep(&event.properties, &["tool_name", "toolName", "tool"]);
1119 let suggested_next_action = first_string_deep(
1120 &event.properties,
1121 &["suggested_next_action", "suggestedNextAction"],
1122 );
1123 let expected_output =
1124 first_string_deep(&event.properties, &["expected_output", "expectedOutput"]).or_else(
1125 || {
1126 first_value(&event.properties, &["output_contract", "outputContract"])
1127 .map(|value| truncate_text(&sanitize_json_value(value).to_string(), 800))
1128 },
1129 );
1130 let actual_output = first_string_deep(&event.properties, &["actual_output", "actualOutput"]);
1131 let tool_args_summary =
1132 first_value(&event.properties, &["tool_args_summary", "toolArgsSummary"])
1133 .map(|value| truncate_text(&sanitize_json_value(value).to_string(), 800));
1134 let tool_result_excerpt = first_string_deep(
1135 &event.properties,
1136 &["tool_result_excerpt", "toolResultExcerpt"],
1137 );
1138 let attempt = first_u64(&event.properties, &["attempt", "task.attempt"]);
1139 let max_attempts = first_u64(
1140 &event.properties,
1141 &["max_attempts", "maxAttempts", "task.max_attempts"],
1142 );
1143 let retry_exhausted = first_value(&event.properties, &["retry_exhausted", "retryExhausted"])
1144 .and_then(Value::as_bool)
1145 .unwrap_or_else(|| {
1146 attempt
1147 .zip(max_attempts)
1148 .map(|(attempt, max)| max > 0 && attempt >= max)
1149 .unwrap_or(false)
1150 });
1151 let files_touched = strings_from_value(
1152 first_value(
1153 &event.properties,
1154 &["files_touched", "filesTouched", "changed_files"],
1155 ),
1156 20,
1157 );
1158 let artifact_refs = strings_from_value(
1159 first_value(
1160 &event.properties,
1161 &["artifact_refs", "artifactRefs", "artifacts"],
1162 ),
1163 20,
1164 );
1165 let mut evidence_refs = artifact_refs.clone();
1166 for evidence_ref in strings_from_value(
1167 first_value(&event.properties, &["evidence_refs", "evidenceRefs"]),
1168 20,
1169 ) {
1170 if !evidence_refs
1171 .iter()
1172 .any(|existing| existing == &evidence_ref)
1173 {
1174 evidence_refs.push(evidence_ref);
1175 }
1176 }
1177 let validation_errors = strings_from_value(
1178 first_value(
1179 &event.properties,
1180 &["validation_errors", "validationErrors"],
1181 ),
1182 12,
1183 );
1184 let missing_workspace_files = strings_from_value(
1185 first_value(
1186 &event.properties,
1187 &["missing_workspace_files", "missingWorkspaceFiles"],
1188 ),
1189 20,
1190 );
1191 let required_next_tool_actions = strings_from_value(
1192 first_value(
1193 &event.properties,
1194 &["required_next_tool_actions", "requiredNextToolActions"],
1195 ),
1196 20,
1197 );
1198 let recent_attempt_evidence = strings_from_value(
1199 first_value(
1200 &event.properties,
1201 &[
1202 "recent_node_attempt_evidence",
1203 "recentNodeAttemptEvidence",
1204 "prior_attempt_evidence",
1205 "priorAttemptEvidence",
1206 ],
1207 ),
1208 12,
1209 );
1210 let correlation_id = first_string_deep(
1211 &event.properties,
1212 &[
1213 "correlationID",
1214 "correlation_id",
1215 "commandID",
1216 "command_id",
1217 "eventID",
1218 ],
1219 );
1220 let component = first_string_deep(
1221 &event.properties,
1222 &[
1223 "component",
1224 "routine_id",
1225 "routineID",
1226 "workflow_id",
1227 "workflowID",
1228 "automation_id",
1229 "automationID",
1230 "node_id",
1231 "nodeID",
1232 "stage_id",
1233 "task",
1234 "title",
1235 ],
1236 );
1237 let confidence = first_string_deep(
1238 &event.properties,
1239 &["confidence", "signal_confidence", "signalConfidence"],
1240 )
1241 .map(|value| truncate_text(&value, 80))
1242 .or_else(|| Some("high".to_string()));
1243 let risk_level = first_string_deep(&event.properties, &["risk_level", "riskLevel", "risk"])
1244 .map(|value| truncate_text(&value, 80))
1245 .or_else(|| Some("medium".to_string()));
1246 let expected_destination = first_string_deep(
1247 &event.properties,
1248 &["expected_destination", "expectedDestination"],
1249 )
1250 .map(|value| truncate_text(&value, 120))
1251 .or_else(|| Some("bug_monitor_issue_draft".to_string()));
1252 let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
1253 if excerpt.is_empty() {
1254 if let Some(reason) = reason.as_ref() {
1255 excerpt.push(reason.clone());
1256 }
1257 }
1258 let sanitized_properties = sanitize_json_value(&event.properties);
1259 let serialized = serde_json::to_string(&sanitized_properties).unwrap_or_default();
1260 let normalized_reason = normalize_reason_for_fingerprint(reason.as_deref().unwrap_or(""));
1261 let mut fingerprint = sha256_hex(&[
1262 repo.as_str(),
1263 workspace_root.as_str(),
1264 event.event_type.as_str(),
1265 normalized_reason.as_str(),
1266 workflow_id.as_deref().unwrap_or(""),
1267 task_id.as_deref().unwrap_or(""),
1268 stage_id.as_deref().unwrap_or(""),
1269 node_id.as_deref().unwrap_or(""),
1270 component.as_deref().unwrap_or(""),
1280 ]);
1281 if let Some(node_fingerprint) = existing_node_incident_fingerprint_for_aggregate_outcome(
1282 state,
1283 &repo,
1284 &workspace_root,
1285 &event.event_type,
1286 workflow_id.as_deref().or(automation_id.as_deref()),
1287 run_id.as_deref(),
1288 node_id
1289 .as_deref()
1290 .or(task_id.as_deref())
1291 .or(stage_id.as_deref()),
1292 reason.as_deref(),
1293 )
1294 .await
1295 {
1296 fingerprint = node_fingerprint;
1297 }
1298 let failure_place = stage_id
1299 .as_ref()
1300 .or(node_id.as_ref())
1301 .or(task_id.as_ref())
1302 .or(component.as_ref());
1303 let title_reason = reason
1304 .as_deref()
1305 .map(|row| truncate_text(row, 120))
1306 .unwrap_or_else(|| event.event_type.clone());
1307 let title = if let Some(workflow_id) = workflow_id.as_ref().or(automation_id.as_ref()) {
1308 if let Some(place) = failure_place {
1309 format!("Workflow {workflow_id} failed at {place}: {title_reason}")
1310 } else {
1311 format!("Workflow {workflow_id} failed: {title_reason}")
1312 }
1313 } else if let Some(routine_id) = routine_id.as_ref() {
1314 format!("Routine {routine_id} failed: {title_reason}")
1315 } else if let Some(component) = component.as_ref() {
1316 format!(
1317 "{} failure in {}: {}",
1318 event.event_type, component, title_reason
1319 )
1320 } else {
1321 format!("{}: {}", event.event_type, title_reason)
1322 };
1323 let mut detail_lines = vec![
1324 format!("event_type: {}", event.event_type),
1325 format!("repo: {}", repo),
1326 format!("workspace_root: {}", workspace_root),
1327 field_line("workflow_id", workflow_id.clone().or(automation_id.clone())),
1328 field_line("workflow_name", workflow_name.clone()),
1329 field_line("run_id", run_id.clone()),
1330 field_line("session_id", session_id.clone()),
1331 field_line("task_id", task_id.clone()),
1332 field_line("stage_id", stage_id.clone()),
1333 field_line("node_id", node_id.clone()),
1334 field_line("component", component.clone()),
1335 field_line("agent_role", agent_role.clone()),
1336 field_line("attempt", attempt.map(|value| value.to_string())),
1337 field_line("max_attempts", max_attempts.map(|value| value.to_string())),
1338 format!("retry_exhausted: {retry_exhausted}"),
1339 field_line("confidence", confidence.clone()),
1340 field_line("risk_level", risk_level.clone()),
1341 field_line("expected_destination", expected_destination.clone()),
1342 field_line("error_kind", error_kind.clone()),
1343 field_line("reason", reason.clone()),
1344 String::new(),
1345 "expected_output:".to_string(),
1346 expected_output.unwrap_or_default(),
1347 String::new(),
1348 "actual_output:".to_string(),
1349 actual_output.unwrap_or_default(),
1350 String::new(),
1351 field_line("tool", tool_name.clone()),
1352 "tool_args_summary:".to_string(),
1353 tool_args_summary.unwrap_or_default(),
1354 "tool_result_excerpt:".to_string(),
1355 tool_result_excerpt.unwrap_or_default(),
1356 String::new(),
1357 "artifact_refs:".to_string(),
1358 if artifact_refs.is_empty() {
1359 String::new()
1360 } else {
1361 artifact_refs.join("\n")
1362 },
1363 "files_touched:".to_string(),
1364 if files_touched.is_empty() {
1365 String::new()
1366 } else {
1367 files_touched.join("\n")
1368 },
1369 "validation_errors:".to_string(),
1370 if validation_errors.is_empty() {
1371 String::new()
1372 } else {
1373 validation_errors.join("\n")
1374 },
1375 "missing_workspace_files:".to_string(),
1376 if missing_workspace_files.is_empty() {
1377 String::new()
1378 } else {
1379 missing_workspace_files.join("\n")
1380 },
1381 "required_next_tool_actions:".to_string(),
1382 if required_next_tool_actions.is_empty() {
1383 String::new()
1384 } else {
1385 required_next_tool_actions.join("\n")
1386 },
1387 "recent_node_attempt_evidence:".to_string(),
1388 if recent_attempt_evidence.is_empty() {
1389 String::new()
1390 } else {
1391 recent_attempt_evidence.join("\n")
1392 },
1393 String::new(),
1394 "suggested_next_action:".to_string(),
1395 suggested_next_action.unwrap_or_default(),
1396 ];
1397 if !serialized.trim().is_empty() {
1398 detail_lines.push(String::new());
1399 detail_lines.push("payload:".to_string());
1400 detail_lines.push(truncate_text(&serialized, 4_000));
1401 }
1402
1403 Ok(BugMonitorSubmission {
1404 project_id: None,
1405 workspace_root: None,
1406 log_source_id: None,
1407 repo: Some(repo),
1408 title: Some(title),
1409 detail: Some(detail_lines.join("\n")),
1410 source: Some(
1411 first_string_deep(&event.properties, &["source"]).unwrap_or_else(|| {
1412 match event.event_type.as_str() {
1413 "automation_v2.run.failed" | "automation.run.failed" => "automation_v2",
1414 "automation_v2.run.paused_stale_no_provider_activity" => "automation_v2",
1415 "workflow.run.failed" | "workflow.validation.failed" => "autonomous_workflow",
1416 "routine.run.failed" => "routine",
1417 "context.task.failed" | "context.task.blocked" | "context.run.failed" => {
1418 "context_run"
1419 }
1420 "coder.run.failed" => "coder",
1421 _ => "tandem_events",
1422 }
1423 .to_string()
1424 }),
1425 ),
1426 run_id,
1427 session_id,
1428 correlation_id,
1429 file_name: files_touched.first().cloned(),
1430 process: Some("tandem-engine".to_string()),
1431 component,
1432 event: Some(event.event_type.clone()),
1433 level: Some("error".to_string()),
1434 excerpt,
1435 fingerprint: Some(fingerprint),
1436 confidence,
1437 risk_level,
1438 expected_destination,
1439 evidence_refs,
1440 })
1441}
1442
1443fn spawn_triage_deadline_task(
1451 state: AppState,
1452 incident_id: String,
1453 draft_id: String,
1454 triage_run_id: String,
1455 timeout_ms: u64,
1456) {
1457 tokio::spawn(async move {
1458 if timeout_ms > 0 {
1459 tokio::time::sleep(std::time::Duration::from_millis(timeout_ms)).await;
1460 }
1461 if crate::http::bug_monitor::bug_monitor_triage_run_is_terminal(&state, &triage_run_id)
1462 .await
1463 {
1464 match crate::http::bug_monitor::finalize_completed_bug_monitor_triage(&state, &draft_id)
1465 .await
1466 {
1467 Ok(true) => return,
1468 Ok(false) => {}
1469 Err(error) => {
1470 tracing::warn!(
1471 incident_id = %incident_id,
1472 draft_id = %draft_id,
1473 triage_run_id = %triage_run_id,
1474 error = %error,
1475 "failed to finalize terminal Bug Monitor triage run at deadline",
1476 );
1477 }
1478 }
1479 }
1480 let now = crate::util::time::now_ms();
1481 let Some(mut draft) = state.get_bug_monitor_draft(&draft_id).await else {
1482 return;
1483 };
1484 let already_marked = draft_is_triage_timed_out(&draft);
1485 if draft_has_github_issue(&draft) {
1486 return;
1487 }
1488 let diagnostics_value = crate::http::bug_monitor::bug_monitor_triage_timeout_diagnostics(
1489 &state,
1490 &triage_run_id,
1491 timeout_ms,
1492 )
1493 .await;
1494 let last_post_error = compose_triage_timeout_last_post_error(
1495 &triage_run_id,
1496 timeout_ms,
1497 diagnostics_value.as_ref(),
1498 );
1499 if !already_marked {
1500 draft.github_status = Some("triage_timed_out".to_string());
1501 draft.last_post_error = Some(last_post_error.clone());
1502 if let Err(error) = state.put_bug_monitor_draft(draft.clone()).await {
1503 tracing::warn!(
1504 incident_id = %incident_id,
1505 draft_id = %draft_id,
1506 error = %error,
1507 "failed to persist bug monitor draft after triage deadline",
1508 );
1509 return;
1510 }
1511 }
1512 if let Some(mut incident) = state.get_bug_monitor_incident(&incident_id).await {
1513 incident.status = "triage_timed_out".to_string();
1514 incident.last_error = Some(
1515 draft
1516 .last_post_error
1517 .clone()
1518 .unwrap_or(last_post_error.clone()),
1519 );
1520 incident.updated_at_ms = now;
1521 if let Err(error) = state.put_bug_monitor_incident(incident.clone()).await {
1522 tracing::warn!(
1523 incident_id = %incident_id,
1524 error = %error,
1525 "failed to persist bug monitor incident after triage deadline",
1526 );
1527 }
1528 if !already_marked {
1529 let mut event_payload = serde_json::json!({
1530 "incident_id": incident_id,
1531 "draft_id": draft_id,
1532 "triage_run_id": triage_run_id,
1533 "timeout_ms": timeout_ms,
1534 });
1535 if let Some(diagnostics) = diagnostics_value.as_ref() {
1536 if let Some(obj) = event_payload.as_object_mut() {
1537 obj.insert("diagnostics".to_string(), diagnostics.clone());
1538 }
1539 }
1540 state.event_bus.publish(EngineEvent::new(
1541 "bug_monitor.incident.triage_timed_out",
1542 event_payload,
1543 ));
1544 }
1545 }
1546 if let Err(error) = crate::bug_monitor_github::publish_draft(
1547 &state,
1548 &draft_id,
1549 Some(&incident_id),
1550 crate::bug_monitor_github::PublishMode::Recovery,
1551 )
1552 .await
1553 {
1554 tracing::warn!(
1555 incident_id = %incident_id,
1556 draft_id = %draft_id,
1557 triage_run_id = %triage_run_id,
1558 error = %error,
1559 "fallback publish after triage deadline failed",
1560 );
1561 }
1562 });
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567 use super::*;
1568 use serde_json::json;
1569
1570 fn event_with(properties: Value) -> EngineEvent {
1571 EngineEvent::new("automation_v2.run.failed", properties)
1572 }
1573
1574 #[test]
1575 fn recursive_triage_skip_reason_detects_triage_automation_id_prefix() {
1576 let event = event_with(json!({
1577 "automation_id": "automation-v2-bug-monitor-triage-failure-draft-abc123",
1578 "agent_role": "agent_writer",
1579 }));
1580 let reason = recursive_triage_skip_reason(&event)
1581 .expect("triage automation_id prefix should trigger skip");
1582 assert!(reason.contains("automation-v2-bug-monitor-triage-"));
1583 }
1584
1585 #[test]
1586 fn recursive_triage_skip_reason_detects_workflow_id_alias() {
1587 let event = event_with(json!({
1589 "workflow_id": "automation-v2-bug-monitor-triage-failure-draft-xyz",
1590 }));
1591 assert!(recursive_triage_skip_reason(&event).is_some());
1592 }
1593
1594 #[test]
1595 fn recursive_triage_skip_reason_detects_triage_agent_role_when_id_missing() {
1596 let event = event_with(json!({
1597 "agent_role": "bug_monitor_triage_agent",
1598 }));
1599 let reason =
1600 recursive_triage_skip_reason(&event).expect("triage agent_role should trigger skip");
1601 assert!(reason.contains("bug_monitor_triage_agent"));
1602 }
1603
1604 #[test]
1605 fn recursive_triage_skip_reason_passes_normal_workflow_failures() {
1606 let event = event_with(json!({
1607 "automation_id": "automation-v2-9ee33834-bf6d-4f86-acb3-3cd41d9cef19",
1608 "agent_role": "agent_reddit_query_researcher",
1609 }));
1610 assert!(recursive_triage_skip_reason(&event).is_none());
1611 }
1612
1613 #[test]
1620 fn recursive_triage_skip_reason_does_not_fire_when_automation_id_is_real() {
1621 let event = event_with(json!({
1622 "automation_id": "automation-v2-9ee33834-bf6d-4f86-acb3-3cd41d9cef19",
1623 "agent_role": "bug_monitor_triage_agent",
1624 }));
1625 assert!(recursive_triage_skip_reason(&event).is_none());
1626 }
1627
1628 #[test]
1629 fn recursive_triage_skip_reason_handles_empty_properties() {
1630 let event = event_with(json!({}));
1631 assert!(recursive_triage_skip_reason(&event).is_none());
1632 }
1633
1634 #[test]
1635 fn normalize_reason_replaces_automation_run_id_in_artifact_path() {
1636 let reason = "required output `.tandem/runs/automation-v2-run-593051dc-78bf-4927-b7db-b831b81d8bdd/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1637 let normalized = normalize_reason_for_fingerprint(reason);
1638 assert!(
1639 normalized.contains("automation-v2-run-RUNID"),
1640 "expected RUNID placeholder, got: {normalized}"
1641 );
1642 assert!(
1643 !normalized.contains("593051dc"),
1644 "leftover run uuid: {normalized}"
1645 );
1646 }
1647
1648 #[test]
1649 fn normalize_reason_collapses_recurrences_to_same_fingerprint() {
1650 let r1 = "required output `.tandem/runs/automation-v2-run-593051dc-78bf-4927-b7db-b831b81d8bdd/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1653 let r2 = "required output `.tandem/runs/automation-v2-run-aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee/artifacts/collect-recent-files.json` was not created for node `collect_recent_files`";
1654 assert_eq!(
1655 normalize_reason_for_fingerprint(r1),
1656 normalize_reason_for_fingerprint(r2),
1657 );
1658 }
1659
1660 #[test]
1661 fn normalize_reason_preserves_numeric_values() {
1662 let r1 = "automation node `prepare_search_manifest` timed out after 180000 ms";
1665 let r2 = "automation node `prepare_search_manifest` timed out after 600000 ms";
1666 assert_ne!(
1667 normalize_reason_for_fingerprint(r1),
1668 normalize_reason_for_fingerprint(r2),
1669 );
1670 }
1671
1672 #[test]
1673 fn normalize_reason_replaces_bare_uuids() {
1674 let reason = "session 0251b4cc-14f3-48d1-8d81-a11c780c7d7c failed validation";
1675 let normalized = normalize_reason_for_fingerprint(reason);
1676 assert!(normalized.contains("UUID"), "got: {normalized}");
1677 assert!(
1678 !normalized.contains("0251b4cc"),
1679 "leftover uuid: {normalized}"
1680 );
1681 }
1682
1683 #[test]
1684 fn normalize_reason_is_idempotent_for_already_clean_text() {
1685 let reason = "failed to reach provider `openai-codex` at https://chatgpt.com/backend-api/codex (request error)";
1688 assert_eq!(normalize_reason_for_fingerprint(reason), reason);
1689 }
1690
1691 #[test]
1692 fn node_id_from_failure_reason_extracts_node_outcome() {
1693 assert_eq!(
1694 node_id_from_failure_reason(
1695 "automation run failed from node outcomes: research_sources"
1696 )
1697 .as_deref(),
1698 Some("research_sources")
1699 );
1700 }
1701
1702 #[test]
1703 fn node_incident_matches_aggregate_outcome_only_for_concrete_node_failure() {
1704 let incident = BugMonitorIncidentRecord {
1705 fingerprint: "node-fingerprint".to_string(),
1706 repo: "frumu-ai/tandem".to_string(),
1707 workspace_root: "/workspace".to_string(),
1708 event_type: "automation_v2.run.failed".to_string(),
1709 run_id: Some("automation-v2-run-1".to_string()),
1710 updated_at_ms: 10,
1711 event_payload: Some(json!({
1712 "workflow_id": "automation-v2-workflow",
1713 "run_id": "automation-v2-run-1",
1714 "node_id": "research_sources",
1715 "reason": "required_workspace_files_missing",
1716 })),
1717 ..Default::default()
1718 };
1719 assert!(node_incident_matches_aggregate_outcome(
1720 &incident,
1721 "frumu-ai/tandem",
1722 "/workspace",
1723 "automation_v2.run.failed",
1724 "automation-v2-workflow",
1725 "automation-v2-run-1",
1726 "research_sources"
1727 ));
1728
1729 let aggregate_incident = BugMonitorIncidentRecord {
1730 event_payload: Some(json!({
1731 "workflow_id": "automation-v2-workflow",
1732 "run_id": "automation-v2-run-1",
1733 "node_id": "research_sources",
1734 "reason": "automation run failed from node outcomes: research_sources",
1735 })),
1736 ..incident.clone()
1737 };
1738 assert!(!node_incident_matches_aggregate_outcome(
1739 &aggregate_incident,
1740 "frumu-ai/tandem",
1741 "/workspace",
1742 "automation_v2.run.failed",
1743 "automation-v2-workflow",
1744 "automation-v2-run-1",
1745 "research_sources"
1746 ));
1747
1748 let wrong_node = BugMonitorIncidentRecord {
1749 event_payload: Some(json!({
1750 "workflow_id": "automation-v2-workflow",
1751 "run_id": "automation-v2-run-1",
1752 "node_id": "generate_report",
1753 "reason": "required_workspace_files_missing",
1754 })),
1755 ..incident.clone()
1756 };
1757 assert!(!node_incident_matches_aggregate_outcome(
1758 &wrong_node,
1759 "frumu-ai/tandem",
1760 "/workspace",
1761 "automation_v2.run.failed",
1762 "automation-v2-workflow",
1763 "automation-v2-run-1",
1764 "research_sources"
1765 ));
1766 }
1767
1768 #[tokio::test]
1769 async fn aggregate_node_outcome_lookup_reuses_existing_node_fingerprint() {
1770 let state = AppState::new_starting("bug-monitor-aggregate-merge-test".to_string(), true);
1771 let node_incident = BugMonitorIncidentRecord {
1772 incident_id: "incident-node".to_string(),
1773 fingerprint: "node-fingerprint".to_string(),
1774 repo: "frumu-ai/tandem".to_string(),
1775 workspace_root: "/workspace".to_string(),
1776 event_type: "automation_v2.run.failed".to_string(),
1777 status: "draft_created".to_string(),
1778 title: "Node failure".to_string(),
1779 run_id: Some("automation-v2-run-1".to_string()),
1780 updated_at_ms: 10,
1781 event_payload: Some(json!({
1782 "workflow_id": "automation-v2-workflow",
1783 "run_id": "automation-v2-run-1",
1784 "node_id": "research_sources",
1785 "reason": "required_workspace_files_missing",
1786 })),
1787 ..Default::default()
1788 };
1789 state
1790 .put_bug_monitor_incident(node_incident)
1791 .await
1792 .expect("store incident");
1793 let event = event_with(json!({
1794 "repo": "frumu-ai/tandem",
1795 "workspace_root": "/workspace",
1796 "workflow_id": "automation-v2-workflow",
1797 "run_id": "automation-v2-run-1",
1798 "reason": "automation run failed from node outcomes: research_sources",
1799 "component": "automation_v2",
1800 }));
1801 let reason = first_string_deep(&event.properties, &["reason"]);
1802 let node_id = reason.as_deref().and_then(node_id_from_failure_reason);
1803
1804 let fingerprint = existing_node_incident_fingerprint_for_aggregate_outcome(
1805 &state,
1806 "frumu-ai/tandem",
1807 "/workspace",
1808 &event.event_type,
1809 Some("automation-v2-workflow"),
1810 Some("automation-v2-run-1"),
1811 node_id.as_deref(),
1812 reason.as_deref(),
1813 )
1814 .await
1815 .expect("aggregate should reuse concrete node incident fingerprint");
1816
1817 assert_eq!(fingerprint, "node-fingerprint");
1818 }
1819
1820 #[test]
1821 fn node_id_from_failure_reason_extracts_timed_out_node() {
1822 assert_eq!(
1823 node_id_from_failure_reason(
1824 "automation node `prepare_search_manifest` timed out after 180000 ms"
1825 )
1826 .as_deref(),
1827 Some("prepare_search_manifest")
1828 );
1829 }
1830}