tandem_server/bug_monitor/
service.rs1use anyhow::Result;
2use serde_json::Value;
3
4use crate::app::state::{sha256_hex, truncate_text, AppState};
5use crate::bug_monitor::types::BugMonitorIncidentRecord;
6use crate::bug_monitor::types::{BugMonitorConfig, BugMonitorSubmission};
7use crate::EngineEvent;
8
9pub async fn collect_bug_monitor_excerpt(state: &AppState, properties: &Value) -> Vec<String> {
10 let mut excerpt = Vec::new();
11 if let Some(reason) = first_string(properties, &["reason", "error", "detail", "message"]) {
12 excerpt.push(reason);
13 }
14 if let Some(title) = first_string(properties, &["title", "task"]) {
15 if !excerpt.iter().any(|row| row == &title) {
16 excerpt.push(title);
17 }
18 }
19 let logs = state.logs.read().await;
20 for entry in logs.iter().rev().take(3) {
21 if let Some(message) = entry.get("message").and_then(|row| row.as_str()) {
22 excerpt.push(truncate_text(message, 240));
23 }
24 }
25 excerpt.truncate(8);
26 excerpt
27}
28
29pub async fn process_event(
30 state: &AppState,
31 event: &EngineEvent,
32 config: &BugMonitorConfig,
33) -> anyhow::Result<BugMonitorIncidentRecord> {
34 let submission = build_bug_monitor_submission_from_event(state, config, event).await?;
35 let duplicate_matches = crate::http::bug_monitor::bug_monitor_failure_pattern_matches(
36 state,
37 submission.repo.as_deref().unwrap_or_default(),
38 submission.fingerprint.as_deref().unwrap_or_default(),
39 submission.title.as_deref(),
40 submission.detail.as_deref(),
41 &submission.excerpt,
42 3,
43 )
44 .await;
45 let fingerprint = submission
46 .fingerprint
47 .clone()
48 .ok_or_else(|| anyhow::anyhow!("bug monitor submission fingerprint missing"))?;
49 let default_workspace_root = state.workspace_index.snapshot().await.root;
50 let workspace_root = config
51 .workspace_root
52 .clone()
53 .unwrap_or(default_workspace_root);
54 let now = crate::util::time::now_ms();
55
56 let existing = state
57 .bug_monitor_incidents
58 .read()
59 .await
60 .values()
61 .find(|row| row.fingerprint == fingerprint)
62 .cloned();
63
64 let mut incident = if let Some(mut row) = existing {
65 row.occurrence_count = row.occurrence_count.saturating_add(1);
66 row.updated_at_ms = now;
67 row.last_seen_at_ms = Some(now);
68 if row.excerpt.is_empty() {
69 row.excerpt = submission.excerpt.clone();
70 }
71 row
72 } else {
73 BugMonitorIncidentRecord {
74 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
75 fingerprint: fingerprint.clone(),
76 event_type: event.event_type.clone(),
77 status: "queued".to_string(),
78 repo: submission.repo.clone().unwrap_or_default(),
79 workspace_root,
80 title: submission
81 .title
82 .clone()
83 .unwrap_or_else(|| format!("Failure detected in {}", event.event_type)),
84 detail: submission.detail.clone(),
85 excerpt: submission.excerpt.clone(),
86 source: submission.source.clone(),
87 run_id: submission.run_id.clone(),
88 session_id: submission.session_id.clone(),
89 correlation_id: submission.correlation_id.clone(),
90 component: submission.component.clone(),
91 level: submission.level.clone(),
92 occurrence_count: 1,
93 created_at_ms: now,
94 updated_at_ms: now,
95 last_seen_at_ms: Some(now),
96 draft_id: None,
97 triage_run_id: None,
98 last_error: None,
99 duplicate_summary: None,
100 duplicate_matches: None,
101 event_payload: Some(event.properties.clone()),
102 }
103 };
104 state.put_bug_monitor_incident(incident.clone()).await?;
105
106 if !duplicate_matches.is_empty() {
107 incident.status = "duplicate_suppressed".to_string();
108 let duplicate_summary =
109 crate::http::bug_monitor::build_bug_monitor_duplicate_summary(&duplicate_matches);
110 incident.duplicate_summary = Some(duplicate_summary.clone());
111 incident.duplicate_matches = Some(duplicate_matches.clone());
112 incident.updated_at_ms = crate::util::time::now_ms();
113 state.put_bug_monitor_incident(incident.clone()).await?;
114 state.event_bus.publish(EngineEvent::new(
115 "bug_monitor.incident.duplicate_suppressed",
116 serde_json::json!({
117 "incident_id": incident.incident_id,
118 "fingerprint": incident.fingerprint,
119 "eventType": incident.event_type,
120 "status": incident.status,
121 "duplicate_summary": duplicate_summary,
122 "duplicate_matches": duplicate_matches,
123 }),
124 ));
125 return Ok(incident);
126 }
127
128 let draft = match state.submit_bug_monitor_draft(submission).await {
129 Ok(draft) => draft,
130 Err(error) => {
131 incident.status = "draft_failed".to_string();
132 incident.last_error = Some(truncate_text(&error.to_string(), 500));
133 incident.updated_at_ms = crate::util::time::now_ms();
134 state.put_bug_monitor_incident(incident.clone()).await?;
135 state.event_bus.publish(EngineEvent::new(
136 "bug_monitor.incident.detected",
137 serde_json::json!({
138 "incident_id": incident.incident_id,
139 "fingerprint": incident.fingerprint,
140 "eventType": incident.event_type,
141 "draft_id": incident.draft_id,
142 "triage_run_id": incident.triage_run_id,
143 "status": incident.status,
144 "detail": incident.last_error,
145 }),
146 ));
147 return Ok(incident);
148 }
149 };
150 incident.draft_id = Some(draft.draft_id.clone());
151 incident.status = "draft_created".to_string();
152 state.put_bug_monitor_incident(incident.clone()).await?;
153
154 match crate::http::bug_monitor::ensure_bug_monitor_triage_run(
155 state.clone(),
156 &draft.draft_id,
157 true,
158 )
159 .await
160 {
161 Ok((updated_draft, _run_id, _deduped)) => {
162 incident.triage_run_id = updated_draft.triage_run_id.clone();
163 if incident.triage_run_id.is_some() {
164 incident.status = "triage_queued".to_string();
165 }
166 incident.last_error = None;
167 }
168 Err(error) => {
169 incident.status = "draft_created".to_string();
170 incident.last_error = Some(truncate_text(&error.to_string(), 500));
171 }
172 }
173
174 if let Some(draft_id) = incident.draft_id.clone() {
175 let latest_draft = state
176 .get_bug_monitor_draft(&draft_id)
177 .await
178 .unwrap_or(draft.clone());
179 match crate::bug_monitor_github::publish_draft(
180 state,
181 &draft_id,
182 Some(&incident.incident_id),
183 crate::bug_monitor_github::PublishMode::Auto,
184 )
185 .await
186 {
187 Ok(outcome) => {
188 incident.status = outcome.action;
189 incident.last_error = None;
190 }
191 Err(error) => {
192 let detail = truncate_text(&error.to_string(), 500);
193 incident.last_error = Some(detail.clone());
194 let mut failed_draft = latest_draft;
195 failed_draft.status = "github_post_failed".to_string();
196 failed_draft.github_status = Some("github_post_failed".to_string());
197 failed_draft.last_post_error = Some(detail.clone());
198 let evidence_digest = failed_draft.evidence_digest.clone();
199 let _ = state.put_bug_monitor_draft(failed_draft.clone()).await;
200 let _ = crate::bug_monitor_github::record_post_failure(
201 state,
202 &failed_draft,
203 Some(&incident.incident_id),
204 "auto_post",
205 evidence_digest.as_deref(),
206 &detail,
207 )
208 .await;
209 }
210 }
211 }
212
213 incident.updated_at_ms = crate::util::time::now_ms();
214 state.put_bug_monitor_incident(incident.clone()).await?;
215 state.event_bus.publish(EngineEvent::new(
216 "bug_monitor.incident.detected",
217 serde_json::json!({
218 "incident_id": incident.incident_id,
219 "fingerprint": incident.fingerprint,
220 "eventType": incident.event_type,
221 "draft_id": incident.draft_id,
222 "triage_run_id": incident.triage_run_id,
223 "status": incident.status,
224 }),
225 ));
226 Ok(incident)
227}
228pub fn first_string(properties: &Value, keys: &[&str]) -> Option<String> {
229 for key in keys {
230 if let Some(value) = properties.get(*key).and_then(|row| row.as_str()) {
231 let trimmed = value.trim();
232 if !trimmed.is_empty() {
233 return Some(trimmed.to_string());
234 }
235 }
236 }
237 None
238}
239
240pub async fn build_bug_monitor_submission_from_event(
241 state: &AppState,
242 config: &BugMonitorConfig,
243 event: &EngineEvent,
244) -> Result<BugMonitorSubmission> {
245 let repo = config
246 .repo
247 .clone()
248 .ok_or_else(|| anyhow::anyhow!("Bug Monitor repo is not configured"))?;
249 let default_workspace_root = state.workspace_index.snapshot().await.root;
250 let workspace_root = config
251 .workspace_root
252 .clone()
253 .unwrap_or(default_workspace_root);
254 let reason = first_string(
255 &event.properties,
256 &["reason", "error", "detail", "message", "summary"],
257 );
258 let run_id = first_string(&event.properties, &["runID", "run_id"]);
259 let session_id = first_string(&event.properties, &["sessionID", "session_id"]);
260 let correlation_id = first_string(
261 &event.properties,
262 &["correlationID", "correlation_id", "commandID", "command_id"],
263 );
264 let component = first_string(
265 &event.properties,
266 &[
267 "component",
268 "routineID",
269 "routine_id",
270 "workflowID",
271 "workflow_id",
272 "task",
273 "title",
274 ],
275 );
276 let mut excerpt = collect_bug_monitor_excerpt(state, &event.properties).await;
277 if excerpt.is_empty() {
278 if let Some(reason) = reason.as_ref() {
279 excerpt.push(reason.clone());
280 }
281 }
282 let serialized = serde_json::to_string(&event.properties).unwrap_or_default();
283 let fingerprint = sha256_hex(&[
284 repo.as_str(),
285 workspace_root.as_str(),
286 event.event_type.as_str(),
287 reason.as_deref().unwrap_or(""),
288 run_id.as_deref().unwrap_or(""),
289 session_id.as_deref().unwrap_or(""),
290 correlation_id.as_deref().unwrap_or(""),
291 component.as_deref().unwrap_or(""),
292 serialized.as_str(),
293 ]);
294 let title = if let Some(component) = component.as_ref() {
295 format!("{} failure in {}", event.event_type, component)
296 } else {
297 format!("{} detected", event.event_type)
298 };
299 let mut detail_lines = vec![
300 format!("event_type: {}", event.event_type),
301 format!("workspace_root: {}", workspace_root),
302 ];
303 if let Some(reason) = reason.as_ref() {
304 detail_lines.push(format!("reason: {reason}"));
305 }
306 if let Some(run_id) = run_id.as_ref() {
307 detail_lines.push(format!("run_id: {run_id}"));
308 }
309 if let Some(session_id) = session_id.as_ref() {
310 detail_lines.push(format!("session_id: {session_id}"));
311 }
312 if let Some(correlation_id) = correlation_id.as_ref() {
313 detail_lines.push(format!("correlation_id: {correlation_id}"));
314 }
315 if let Some(component) = component.as_ref() {
316 detail_lines.push(format!("component: {component}"));
317 }
318 if !serialized.trim().is_empty() {
319 detail_lines.push(String::new());
320 detail_lines.push("payload:".to_string());
321 detail_lines.push(truncate_text(&serialized, 2_000));
322 }
323
324 Ok(BugMonitorSubmission {
325 repo: Some(repo),
326 title: Some(title),
327 detail: Some(detail_lines.join("\n")),
328 source: Some("tandem_events".to_string()),
329 run_id,
330 session_id,
331 correlation_id,
332 file_name: None,
333 process: Some("tandem-engine".to_string()),
334 component,
335 event: Some(event.event_type.clone()),
336 level: Some("error".to_string()),
337 excerpt,
338 fingerprint: Some(fingerprint),
339 })
340}