Skip to main content

tandem_server/bug_monitor/
service.rs

1use anyhow::Result;
2use serde_json::Value;
3
4use crate::app::state::{sha256_hex, truncate_text, AppState};
5use crate::bug_monitor::types::BugMonitorIncidentRecord;
6use crate::bug_monitor::types::{BugMonitorConfig, BugMonitorSubmission};
7use crate::EngineEvent;
8
9pub async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
10    let mut excerpt = Vec::new();
11    if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
12        excerpt.push(reason);
13    }
14    if let Some(title) = first_string(properties, &["title", "task"]) {
15        if !excerpt.iter().any(|row| row == &title) {
16            excerpt.push(title);
17        }
18    }
19    let logs = state.logs.read().await;
20    for entry in logs.iter().rev().take(3) {
21        if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
22            excerpt.push(truncate_text(message, 240));
23        }
24    }
25    excerpt.truncate(8);
26    excerpt
27}
28
29pub async fn process_event(
30    state: &AppState,
31    event: &EngineEvent,
32    config: &BugMonitorConfig,
33) -> anyhow::Result<BugMonitorIncidentRecord> {
34    let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
35    let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
36        state,
37        submission.repo.as_deref().unwrap_or_default(),
38        submission.fingerprint.as_deref().unwrap_or_default(),
39        submission.title.as_deref(),
40        submission.detail.as_deref(),
41        &submission.excerpt,
42        3,
43    )
44    .await;
45    let fingerprint = submission
46        .fingerprint
47        .clone()
48        .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
49    let default_workspace_root = state.workspace_index.snapshot().await.root;
50    let workspace_root = config
51        .workspace_root
52        .clone()
53        .unwrap_or(default_workspace_root);
54    let now = crate::util::time::now_ms();
55
56    let existing = state
57        .bug_monitor_incidents
58        .read()
59        .await
60        .values()
61        .find(|row| row.fingerprint == fingerprint)
62        .cloned();
63
64    let mut incident = if let Some(mut row) = existing {
65        row.occurrence_count = row.occurrence_count.saturating_add(1);
66        row.updated_at_ms = now;
67        row.last_seen_at_ms = Some(now);
68        if row.excerpt.is_empty() {
69            row.excerpt = submission.excerpt.clone();
70        }
71        row
72    } else {
73        BugMonitorIncidentRecord {
74            incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
75            fingerprint: fingerprint.clone(),
76            event_type: event.event_type.clone(),
77            status: "queued".to_string(),
78            repo: submission.repo.clone().unwrap_or_default(),
79            workspace_root,
80            title: submission
81                .title
82                .clone()
83                .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
84            detail: submission.detail.clone(),
85            excerpt: submission.excerpt.clone(),
86            source: submission.source.clone(),
87            run_id: submission.run_id.clone(),
88            session_id: submission.session_id.clone(),
89            correlation_id: submission.correlation_id.clone(),
90            component: submission.component.clone(),
91            level: submission.level.clone(),
92            occurrence_count: 1,
93            created_at_ms: now,
94            updated_at_ms: now,
95            last_seen_at_ms: Some(now),
96            draft_id: None,
97            triage_run_id: None,
98            last_error: None,
99            duplicate_summary: None,
100            duplicate_matches: None,
101            event_payload: Some(event.properties.clone()),
102        }
103    };
104    state.put_bug_monitor_incident(incident.clone()).await?;
105
106    if !duplicate_matches.is_empty() {
107        incident.status = "duplicate_suppressed".to_string();
108        let duplicate_summary =
109            crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
110        incident.duplicate_summary = Some(duplicate_summary.clone());
111        incident.duplicate_matches = Some(duplicate_matches.clone());
112        incident.updated_at_ms = crate::util::time::now_ms();
113        state.put_bug_monitor_incident(incident.clone()).await?;
114        state.event_bus.publish(EngineEvent::new(
115            "bug_monitor.incident.duplicate_suppressed",
116            serde_json::json!({
117                "incident_id": incident.incident_id,
118                "fingerprint": incident.fingerprint,
119                "eventType": incident.event_type,
120                "status": incident.status,
121                "duplicate_summary": duplicate_summary,
122                "duplicate_matches": duplicate_matches,
123            }),
124        ));
125        return Ok(incident);
126    }
127
128    let draft = match state.submit_bug_monitor_draft(submission).await {
129        Ok(draft) => draft,
130        Err(error) => {
131            incident.status = "draft_failed".to_string();
132            incident.last_error = Some(truncate_text(&error.to_string(), 500));
133            incident.updated_at_ms = crate::util::time::now_ms();
134            state.put_bug_monitor_incident(incident.clone()).await?;
135            state.event_bus.publish(EngineEvent::new(
136                "bug_monitor.incident.detected",
137                serde_json::json!({
138                    "incident_id": incident.incident_id,
139                    "fingerprint": incident.fingerprint,
140                    "eventType": incident.event_type,
141                    "draft_id": incident.draft_id,
142                    "triage_run_id": incident.triage_run_id,
143                    "status": incident.status,
144                    "detail": incident.last_error,
145                }),
146            ));
147            return Ok(incident);
148        }
149    };
150    incident.draft_id = Some(draft.draft_id.clone());
151    incident.status = "draft_created".to_string();
152    state.put_bug_monitor_incident(incident.clone()).await?;
153
154    match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
155        state.clone(),
156        &draft.draft_id,
157        true,
158    )
159    .await
160    {
161        Ok((updated_draft, _run_id, _deduped)) => {
162            incident.triage_run_id = updated_draft.triage_run_id.clone();
163            if incident.triage_run_id.is_some() {
164                incident.status = "triage_queued".to_string();
165            }
166            incident.last_error = None;
167        }
168        Err(error) => {
169            incident.status = "draft_created".to_string();
170            incident.last_error = Some(truncate_text(&error.to_string(), 500));
171        }
172    }
173
174    if let Some(draft_id) = incident.draft_id.clone() {
175        let latest_draft = state
176            .get_bug_monitor_draft(&draft_id)
177            .await
178            .unwrap_or(draft.clone());
179        match crate::bug_monitor_github::publish_draft(
180            state,
181            &draft_id,
182            Some(&incident.incident_id),
183            crate::bug_monitor_github::PublishMode::Auto,
184        )
185        .await
186        {
187            Ok(outcome) => {
188                incident.status = outcome.action;
189                incident.last_error = None;
190            }
191            Err(error) => {
192                let detail = truncate_text(&error.to_string(), 500);
193                incident.last_error = Some(detail.clone());
194                let mut failed_draft = latest_draft;
195                failed_draft.status = "github_post_failed".to_string();
196                failed_draft.github_status = Some("github_post_failed".to_string());
197                failed_draft.last_post_error = Some(detail.clone());
198                let evidence_digest = failed_draft.evidence_digest.clone();
199                let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
200                let _ = crate::bug_monitor_github::record_post_failure(
201                    state,
202                    &failed_draft,
203                    Some(&incident.incident_id),
204                    "auto_post",
205                    evidence_digest.as_deref(),
206                    &detail,
207                )
208                .await;
209            }
210        }
211    }
212
213    incident.updated_at_ms = crate::util::time::now_ms();
214    state.put_bug_monitor_incident(incident.clone()).await?;
215    state.event_bus.publish(EngineEvent::new(
216        "bug_monitor.incident.detected",
217        serde_json::json!({
218            "incident_id": incident.incident_id,
219            "fingerprint": incident.fingerprint,
220            "eventType": incident.event_type,
221            "draft_id": incident.draft_id,
222            "triage_run_id": incident.triage_run_id,
223            "status": incident.status,
224        }),
225    ));
226    Ok(incident)
227}
228pub fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
229    for key in keys {
230        if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
231            let trimmed = value.trim();
232            if !trimmed.is_empty() {
233                return Some(trimmed.to_string());
234            }
235        }
236    }
237    None
238}
239
240pub async fn build_bug_monitor_submission_from_event(
241    state: &AppState,
242    config: &BugMonitorConfig,
243    event: &EngineEvent,
244) -> Result<BugMonitorSubmission> {
245    let repo = config
246        .repo
247        .clone()
248        .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
249    let default_workspace_root = state.workspace_index.snapshot().await.root;
250    let workspace_root = config
251        .workspace_root
252        .clone()
253        .unwrap_or(default_workspace_root);
254    let reason = first_string(
255        &event.properties,
256        &["reason", "error", "detail", "message", "summary"],
257    );
258    let run_id = first_string(&event.properties, &["runID", "run_id"]);
259    let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
260    let correlation_id = first_string(
261        &event.properties,
262        &["correlationID", "correlation_id", "commandID", "command_id"],
263    );
264    let component = first_string(
265        &event.properties,
266        &[
267            "component",
268            "routineID",
269            "routine_id",
270            "workflowID",
271            "workflow_id",
272            "task",
273            "title",
274        ],
275    );
276    let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
277    if excerpt.is_empty() {
278        if let Some(reason) = reason.as_ref() {
279            excerpt.push(reason.clone());
280        }
281    }
282    let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
283    let fingerprint = sha256_hex(&[
284        repo.as_str(),
285        workspace_root.as_str(),
286        event.event_type.as_str(),
287        reason.as_deref().unwrap_or(""),
288        run_id.as_deref().unwrap_or(""),
289        session_id.as_deref().unwrap_or(""),
290        correlation_id.as_deref().unwrap_or(""),
291        component.as_deref().unwrap_or(""),
292        serialized.as_str(),
293    ]);
294    let title = if let Some(component) = component.as_ref() {
295        format!("{} failure in {}", event.event_type, component)
296    } else {
297        format!("{} detected", event.event_type)
298    };
299    let mut detail_lines = vec![
300        format!("event_type: {}", event.event_type),
301        format!("workspace_root: {}", workspace_root),
302    ];
303    if let Some(reason) = reason.as_ref() {
304        detail_lines.push(format!("reason: {reason}"));
305    }
306    if let Some(run_id) = run_id.as_ref() {
307        detail_lines.push(format!("run_id: {run_id}"));
308    }
309    if let Some(session_id) = session_id.as_ref() {
310        detail_lines.push(format!("session_id: {session_id}"));
311    }
312    if let Some(correlation_id) = correlation_id.as_ref() {
313        detail_lines.push(format!("correlation_id: {correlation_id}"));
314    }
315    if let Some(component) = component.as_ref() {
316        detail_lines.push(format!("component: {component}"));
317    }
318    if !serialized.trim().is_empty() {
319        detail_lines.push(String::new());
320        detail_lines.push("payload:".to_string());
321        detail_lines.push(truncate_text(&serialized, 2_000));
322    }
323
324    Ok(BugMonitorSubmission {
325        repo: Some(repo),
326        title: Some(title),
327        detail: Some(detail_lines.join("\n")),
328        source: Some("tandem_events".to_string()),
329        run_id,
330        session_id,
331        correlation_id,
332        file_name: None,
333        process: Some("tandem-engine".to_string()),
334        component,
335        event: Some(event.event_type.clone()),
336        level: Some("error".to_string()),
337        excerpt,
338        fingerprint: Some(fingerprint),
339    })
340}