1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use anyhow::Context;
5use serde_json::{json, Value};
6use tokio::fs;
7use tokio::io::{AsyncReadExt, AsyncSeekExt};
8
9use crate::{
10 AppState, BugMonitorDraftRecord, BugMonitorIncidentRecord, BugMonitorLogCandidate,
11 BugMonitorLogSource, BugMonitorLogSourceRuntimeStatus, BugMonitorLogSourceState,
12 BugMonitorLogStartPosition, BugMonitorLogWatcherStatus, BugMonitorMonitoredProject,
13 BugMonitorSubmission,
14};
15
16#[derive(Debug, Clone)]
17pub struct BugMonitorLogReplayResult {
18 pub incident: BugMonitorIncidentRecord,
19 pub draft: Option<BugMonitorDraftRecord>,
20}
21
22pub async fn run_bug_monitor_log_watcher(state: AppState) {
23 state
24 .update_bug_monitor_log_watcher_status(|status| {
25 status.running = true;
26 status.last_error = None;
27 })
28 .await;
29 loop {
30 let now_ms = crate::now_ms();
31 if let Err(error) = poll_enabled_sources(&state, now_ms).await {
32 state
33 .update_bug_monitor_log_watcher_status(|status| {
34 status.running = true;
35 status.last_error = Some(error.to_string());
36 })
37 .await;
38 }
39 tokio::time::sleep(Duration::from_secs(1)).await;
40 }
41}
42
43async fn poll_enabled_sources(state: &AppState, now_ms: u64) -> anyhow::Result<()> {
44 let config = state.bug_monitor_config().await;
45 let enabled_projects = config
46 .monitored_projects
47 .iter()
48 .filter(|project| project.enabled && !project.paused)
49 .count();
50 let enabled_sources = config
51 .monitored_projects
52 .iter()
53 .filter(|project| project.enabled && !project.paused)
54 .flat_map(|project| project.log_sources.iter())
55 .filter(|source| source.enabled && !source.paused)
56 .count();
57 if !config.enabled || config.paused {
58 state
59 .update_bug_monitor_log_watcher_status(|status| {
60 status.running = true;
61 status.enabled_projects = enabled_projects;
62 status.enabled_sources = enabled_sources;
63 status.last_poll_at_ms = Some(now_ms);
64 })
65 .await;
66 return Ok(());
67 }
68
69 let previous_status = state.bug_monitor_log_watcher_status.read().await.clone();
70 let mut source_statuses = previous_status.sources;
71 for project in config
72 .monitored_projects
73 .iter()
74 .filter(|project| project.enabled && !project.paused)
75 {
76 for source in project
77 .log_sources
78 .iter()
79 .filter(|source| source.enabled && !source.paused)
80 {
81 let key = format!("{}/{}", project.project_id, source.source_id);
82 let last_poll_at = source_statuses
83 .iter()
84 .find(|row| format!("{}/{}", row.project_id, row.source_id) == key)
85 .and_then(|row| row.last_poll_at_ms)
86 .unwrap_or(0);
87 if now_ms < last_poll_at.saturating_add(source.watch_interval_seconds * 1000) {
88 continue;
89 }
90 let status = poll_log_source_once(state, project, source, now_ms).await?;
91 source_statuses.retain(|row| {
92 !(row.project_id == status.project_id && row.source_id == status.source_id)
93 });
94 source_statuses.push(status);
95 }
96 }
97 state
98 .update_bug_monitor_log_watcher_status(|status| {
99 status.running = true;
100 status.enabled_projects = enabled_projects;
101 status.enabled_sources = enabled_sources;
102 status.last_poll_at_ms = Some(now_ms);
103 status.last_error = None;
104 status.sources = source_statuses;
105 })
106 .await;
107 Ok(())
108}
109
110pub async fn poll_log_source_once(
111 state: &AppState,
112 project: &BugMonitorMonitoredProject,
113 source: &BugMonitorLogSource,
114 now_ms: u64,
115) -> anyhow::Result<BugMonitorLogSourceRuntimeStatus> {
116 let absolute_path = resolve_log_source_path(project, source)?;
117 let mut source_state = state
118 .get_bug_monitor_log_source_state(&project.project_id, &source.source_id)
119 .await
120 .unwrap_or_else(|| BugMonitorLogSourceState {
121 project_id: project.project_id.clone(),
122 source_id: source.source_id.clone(),
123 path: absolute_path.display().to_string(),
124 updated_at_ms: now_ms,
125 ..BugMonitorLogSourceState::default()
126 });
127 source_state.path = absolute_path.display().to_string();
128
129 let metadata = match fs::metadata(&absolute_path).await {
130 Ok(metadata) => metadata,
131 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
132 source_state.last_error = Some("log file not found".to_string());
133 source_state.consecutive_errors = source_state.consecutive_errors.saturating_add(1);
134 source_state.updated_at_ms = now_ms;
135 state
136 .put_bug_monitor_log_source_state(source_state.clone())
137 .await?;
138 return Ok(status_from_state(
139 &source_state,
140 false,
141 None,
142 Some(now_ms),
143 None,
144 None,
145 ));
146 }
147 Err(error) => return Err(error).context("failed to stat log file"),
148 };
149 let file_size = metadata.len();
150 let inode = inode_for_metadata(&metadata);
151 if source_state.offset == 0
152 && source_state.inode.is_none()
153 && matches!(source.start_position, BugMonitorLogStartPosition::End)
154 {
155 source_state.offset = file_size;
156 source_state.inode = inode.clone();
157 source_state.updated_at_ms = now_ms;
158 source_state.last_error = None;
159 source_state.consecutive_errors = 0;
160 state
161 .put_bug_monitor_log_source_state(source_state.clone())
162 .await?;
163 return Ok(status_from_state(
164 &source_state,
165 true,
166 Some(file_size),
167 Some(now_ms),
168 None,
169 None,
170 ));
171 }
172 if source_state
173 .inode
174 .as_ref()
175 .zip(inode.as_ref())
176 .is_some_and(|(a, b)| a != b)
177 || file_size < source_state.offset
178 {
179 source_state.offset = 0;
180 source_state.partial_line = None;
181 source_state.partial_line_offset_start = None;
182 }
183 source_state.inode = inode.clone();
184
185 let read_len = file_size
186 .saturating_sub(source_state.offset)
187 .min(source.max_bytes_per_poll);
188 if read_len == 0 {
189 source_state.updated_at_ms = now_ms;
190 source_state.last_error = None;
191 source_state.consecutive_errors = 0;
192 state
193 .put_bug_monitor_log_source_state(source_state.clone())
194 .await?;
195 return Ok(status_from_state(
196 &source_state,
197 true,
198 Some(file_size),
199 Some(now_ms),
200 None,
201 None,
202 ));
203 }
204
205 let mut file = fs::File::open(&absolute_path).await?;
206 file.seek(std::io::SeekFrom::Start(source_state.offset))
207 .await?;
208 let mut bytes = vec![0; read_len as usize];
209 file.read_exact(&mut bytes).await?;
210 let parse_result = crate::bug_monitor::log_parser::parse_log_candidates(
211 project,
212 source,
213 &absolute_path,
214 inode.clone(),
215 source_state.offset,
216 &bytes,
217 source_state.partial_line.clone(),
218 source_state.partial_line_offset_start,
219 );
220 source_state.offset = source_state.offset.saturating_add(read_len);
221 source_state.partial_line = parse_result.next_partial_line;
222 source_state.partial_line_offset_start = parse_result.next_partial_line_offset_start;
223 source_state.total_bytes_read = source_state.total_bytes_read.saturating_add(read_len);
224 source_state.total_candidates = source_state
225 .total_candidates
226 .saturating_add(parse_result.candidates.len() as u64);
227 source_state.updated_at_ms = now_ms;
228 source_state.last_error = None;
229 source_state.consecutive_errors = 0;
230
231 let mut submitted = 0usize;
232 let mut last_candidate_at_ms = None;
233 let mut last_submitted_at_ms = None;
234 for candidate in parse_result
235 .candidates
236 .into_iter()
237 .take(source.max_candidates_per_poll)
238 {
239 last_candidate_at_ms = Some(now_ms);
240 if fingerprint_recent(
241 &mut source_state,
242 &candidate.fingerprint,
243 source.fingerprint_cooldown_ms,
244 now_ms,
245 ) {
246 continue;
247 }
248 if submit_log_candidate(state, project, source, candidate)
249 .await?
250 .is_some()
251 {
252 submitted += 1;
253 last_submitted_at_ms = Some(now_ms);
254 }
255 }
256 source_state.total_submitted = source_state
257 .total_submitted
258 .saturating_add(submitted as u64);
259 prune_recent_fingerprints(&mut source_state, now_ms);
260 state
261 .put_bug_monitor_log_source_state(source_state.clone())
262 .await?;
263 Ok(status_from_state(
264 &source_state,
265 true,
266 Some(file_size),
267 Some(now_ms),
268 last_candidate_at_ms,
269 last_submitted_at_ms,
270 ))
271}
272
273pub async fn submit_log_candidate(
274 state: &AppState,
275 project: &BugMonitorMonitoredProject,
276 source: &BugMonitorLogSource,
277 mut candidate: BugMonitorLogCandidate,
278) -> anyhow::Result<Option<BugMonitorDraftRecord>> {
279 let evidence_ref =
280 crate::bug_monitor::log_artifacts::write_log_evidence_artifact(state, &candidate).await?;
281 if !candidate
282 .evidence_refs
283 .iter()
284 .any(|row| row == &evidence_ref)
285 {
286 candidate.evidence_refs.push(evidence_ref.clone());
287 }
288 let now = crate::now_ms();
289 let event_payload = json!({
290 "project_id": project.project_id,
291 "source_id": source.source_id,
292 "log_path": candidate.path,
293 "offset_start": candidate.offset_start,
294 "offset_end": candidate.offset_end,
295 "inode": candidate.inode,
296 "evidence_artifact": evidence_ref,
297 "detected_at_ms": now,
298 "mcp_server": project.mcp_server,
299 "model_policy": project.model_policy,
300 });
301 let mut incident = latest_bug_monitor_incident_by_repo_fingerprint(
302 state,
303 &project.repo,
304 &candidate.fingerprint,
305 )
306 .await
307 .unwrap_or_else(|| BugMonitorIncidentRecord {
308 incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
309 fingerprint: candidate.fingerprint.clone(),
310 event_type: candidate.event.clone(),
311 status: "queued".to_string(),
312 repo: project.repo.clone(),
313 workspace_root: project.workspace_root.clone(),
314 title: candidate.title.clone(),
315 detail: Some(candidate.detail.clone()),
316 excerpt: candidate.excerpt.clone(),
317 source: Some(candidate.source.clone()),
318 component: candidate.component.clone(),
319 level: Some(candidate.level.clone()),
320 occurrence_count: 0,
321 created_at_ms: now,
322 updated_at_ms: now,
323 last_seen_at_ms: Some(now),
324 confidence: Some(candidate.confidence.clone()),
325 risk_level: Some(candidate.risk_level.clone()),
326 expected_destination: Some(candidate.expected_destination.clone()),
327 evidence_refs: candidate.evidence_refs.clone(),
328 event_payload: Some(event_payload.clone()),
329 ..BugMonitorIncidentRecord::default()
330 });
331 incident.occurrence_count = incident.occurrence_count.saturating_add(1).max(1);
332 incident.updated_at_ms = now;
333 incident.last_seen_at_ms = Some(now);
334 if incident.workspace_root.trim().is_empty() {
335 incident.workspace_root = project.workspace_root.clone();
336 }
337 merge_evidence_refs(&mut incident.evidence_refs, &candidate.evidence_refs);
338 incident.event_payload = Some(merge_payload(incident.event_payload.clone(), event_payload));
339
340 if let Some(draft_id) = incident.draft_id.as_deref() {
341 if let Some(draft) = state.get_bug_monitor_draft(draft_id).await {
342 state.put_bug_monitor_incident(incident).await?;
343 return Ok(Some(draft));
344 }
345 }
346
347 state.put_bug_monitor_incident(incident.clone()).await?;
348 let submission = BugMonitorSubmission {
349 project_id: Some(project.project_id.clone()),
350 workspace_root: Some(project.workspace_root.clone()),
351 log_source_id: Some(source.source_id.clone()),
352 repo: Some(project.repo.clone()),
353 title: Some(candidate.title.clone()),
354 detail: Some(candidate.detail.clone()),
355 source: Some(candidate.source.clone()),
356 file_name: Some(candidate.path.clone()),
357 process: candidate.process.clone(),
358 component: candidate.component.clone(),
359 event: Some(candidate.event.clone()),
360 level: Some(candidate.level.clone()),
361 excerpt: candidate.excerpt.clone(),
362 fingerprint: Some(candidate.fingerprint.clone()),
363 confidence: Some(candidate.confidence.clone()),
364 risk_level: Some(candidate.risk_level.clone()),
365 expected_destination: Some(candidate.expected_destination.clone()),
366 evidence_refs: candidate.evidence_refs.clone(),
367 ..BugMonitorSubmission::default()
368 };
369 let mut draft = match state.submit_bug_monitor_draft(submission).await {
370 Ok(draft) => draft,
371 Err(error) => {
372 incident.status = "draft_failed".to_string();
373 incident.last_error = Some(error.to_string());
374 state.put_bug_monitor_incident(incident).await?;
375 return Ok(None);
376 }
377 };
378 if project.require_approval_for_new_issues && draft.status != "approval_required" {
379 draft.status = "approval_required".to_string();
380 draft = state.put_bug_monitor_draft(draft).await?;
381 }
382 incident.draft_id = Some(draft.draft_id.clone());
383 incident.status = "draft_created".to_string();
384 state.put_bug_monitor_incident(incident.clone()).await?;
385 if project.auto_create_new_issues {
386 if let Ok((updated_draft, _run_id, _deduped)) =
387 crate::http::bug_monitor::ensure_bug_monitor_triage_run(
388 state.clone(),
389 &draft.draft_id,
390 true,
391 )
392 .await
393 {
394 draft = updated_draft;
395 }
396 }
397 Ok(Some(draft))
398}
399
400pub async fn reset_log_source_offset(
401 state: &AppState,
402 project: &BugMonitorMonitoredProject,
403 source: &BugMonitorLogSource,
404 now_ms: u64,
405) -> anyhow::Result<BugMonitorLogSourceState> {
406 let absolute_path = resolve_log_source_path(project, source)?;
407 let metadata = fs::metadata(&absolute_path).await.ok();
408 let inode = metadata.as_ref().and_then(inode_for_metadata);
409 let mut source_state = state
410 .get_bug_monitor_log_source_state(&project.project_id, &source.source_id)
411 .await
412 .unwrap_or_else(|| BugMonitorLogSourceState {
413 project_id: project.project_id.clone(),
414 source_id: source.source_id.clone(),
415 ..BugMonitorLogSourceState::default()
416 });
417 source_state.path = absolute_path.display().to_string();
418 source_state.inode = inode;
419 source_state.offset = 0;
420 source_state.partial_line = None;
421 source_state.partial_line_offset_start = None;
422 source_state.last_line_hash = None;
423 source_state.recent_fingerprints.clear();
424 source_state.updated_at_ms = now_ms;
425 source_state.last_error = None;
426 source_state.consecutive_errors = 0;
427 let source_state = state
428 .put_bug_monitor_log_source_state(source_state.clone())
429 .await?;
430 let file_size = metadata.as_ref().map(|metadata| metadata.len());
431 state
432 .update_bug_monitor_log_watcher_status(|status| {
433 status.sources.retain(|row| {
434 !(row.project_id == source_state.project_id
435 && row.source_id == source_state.source_id)
436 });
437 status.sources.push(status_from_state(
438 &source_state,
439 metadata.is_some(),
440 file_size,
441 None,
442 None,
443 None,
444 ));
445 })
446 .await;
447 Ok(source_state)
448}
449
450pub async fn replay_latest_log_source_candidate(
451 state: &AppState,
452 project: &BugMonitorMonitoredProject,
453 source: &BugMonitorLogSource,
454) -> anyhow::Result<Option<BugMonitorLogReplayResult>> {
455 let Some(incident) =
456 latest_bug_monitor_incident_by_log_source(state, &project.project_id, &source.source_id)
457 .await
458 else {
459 return Ok(None);
460 };
461 let payload = incident
462 .event_payload
463 .as_ref()
464 .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no event payload"))?;
465 let offset_start = payload
466 .get("offset_start")
467 .and_then(Value::as_u64)
468 .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no offset_start"))?;
469 let offset_end = payload
470 .get("offset_end")
471 .and_then(Value::as_u64)
472 .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no offset_end"))?;
473 if offset_end <= offset_start {
474 anyhow::bail!("latest log-source incident has invalid offsets");
475 }
476
477 let absolute_path = resolve_log_source_path(project, source)?;
478 let metadata = fs::metadata(&absolute_path)
479 .await
480 .with_context(|| format!("failed to stat log file {}", absolute_path.display()))?;
481 if metadata.len() < offset_end {
482 anyhow::bail!("latest log-source incident offsets exceed current log file size");
483 }
484 let inode = inode_for_metadata(&metadata);
485 let mut file = fs::File::open(&absolute_path).await?;
486 file.seek(std::io::SeekFrom::Start(offset_start)).await?;
487 let read_len = offset_end.saturating_sub(offset_start);
488 let mut bytes = vec![0; read_len as usize];
489 file.read_exact(&mut bytes).await?;
490 let parse_result = crate::bug_monitor::log_parser::parse_log_candidates(
491 project,
492 source,
493 &absolute_path,
494 inode,
495 offset_start,
496 &bytes,
497 None,
498 None,
499 );
500 let Some(candidate) = parse_result.candidates.into_iter().find(|candidate| {
501 candidate.offset_start == offset_start && candidate.offset_end == offset_end
502 }) else {
503 anyhow::bail!("latest log-source incident offsets no longer parse to a candidate");
504 };
505 let draft = submit_log_candidate(state, project, source, candidate).await?;
506 Ok(Some(BugMonitorLogReplayResult { incident, draft }))
507}
508
509pub fn resolve_log_source_path(
510 project: &BugMonitorMonitoredProject,
511 source: &BugMonitorLogSource,
512) -> anyhow::Result<PathBuf> {
513 let workspace_root = PathBuf::from(&project.workspace_root);
514 if !workspace_root.is_absolute() {
515 anyhow::bail!("workspace_root must be absolute");
516 }
517 let workspace_canonical = nearest_existing_path(&workspace_root)
518 .and_then(|path| path.canonicalize().ok())
519 .unwrap_or(workspace_root.clone());
520 let raw_path = PathBuf::from(&source.path);
521 let candidate = if raw_path.is_absolute() {
522 raw_path
523 } else {
524 workspace_root.join(raw_path)
525 };
526 let nearest = nearest_existing_path(&candidate)
527 .ok_or_else(|| anyhow::anyhow!("log source path has no existing parent"))?;
528 let nearest_canonical = nearest.canonicalize()?;
529 if !nearest_canonical.starts_with(&workspace_canonical) {
530 anyhow::bail!("log source path escapes workspace_root");
531 }
532 Ok(candidate)
533}
534
535async fn latest_bug_monitor_incident_by_repo_fingerprint(
536 state: &AppState,
537 repo: &str,
538 fingerprint: &str,
539) -> Option<BugMonitorIncidentRecord> {
540 state
541 .bug_monitor_incidents
542 .read()
543 .await
544 .values()
545 .filter(|row| row.repo == repo && row.fingerprint == fingerprint)
546 .max_by_key(|row| row.updated_at_ms)
547 .cloned()
548}
549
550async fn latest_bug_monitor_incident_by_log_source(
551 state: &AppState,
552 project_id: &str,
553 source_id: &str,
554) -> Option<BugMonitorIncidentRecord> {
555 state
556 .bug_monitor_incidents
557 .read()
558 .await
559 .values()
560 .filter(|row| {
561 row.event_payload.as_ref().is_some_and(|payload| {
562 payload
563 .get("project_id")
564 .and_then(Value::as_str)
565 .is_some_and(|value| value == project_id)
566 && payload
567 .get("source_id")
568 .and_then(Value::as_str)
569 .is_some_and(|value| value == source_id)
570 && payload
571 .get("offset_start")
572 .and_then(Value::as_u64)
573 .is_some()
574 && payload.get("offset_end").and_then(Value::as_u64).is_some()
575 })
576 })
577 .max_by_key(|row| row.updated_at_ms)
578 .cloned()
579}
580
581fn status_from_state(
582 state: &BugMonitorLogSourceState,
583 healthy: bool,
584 file_size: Option<u64>,
585 last_poll_at_ms: Option<u64>,
586 last_candidate_at_ms: Option<u64>,
587 last_submitted_at_ms: Option<u64>,
588) -> BugMonitorLogSourceRuntimeStatus {
589 BugMonitorLogSourceRuntimeStatus {
590 project_id: state.project_id.clone(),
591 source_id: state.source_id.clone(),
592 path: state.path.clone(),
593 healthy,
594 offset: state.offset,
595 inode: state.inode.clone(),
596 file_size,
597 last_poll_at_ms,
598 last_candidate_at_ms,
599 last_submitted_at_ms,
600 last_error: state.last_error.clone(),
601 consecutive_errors: state.consecutive_errors,
602 total_bytes_read: state.total_bytes_read,
603 total_candidates: state.total_candidates,
604 total_submitted: state.total_submitted,
605 }
606}
607
608fn fingerprint_recent(
609 state: &mut BugMonitorLogSourceState,
610 fingerprint: &str,
611 cooldown_ms: u64,
612 now_ms: u64,
613) -> bool {
614 let recent = state
615 .recent_fingerprints
616 .get(fingerprint)
617 .map(|seen| now_ms.saturating_sub(*seen) < cooldown_ms)
618 .unwrap_or(false);
619 state
620 .recent_fingerprints
621 .insert(fingerprint.to_string(), now_ms);
622 recent
623}
624
625fn prune_recent_fingerprints(state: &mut BugMonitorLogSourceState, now_ms: u64) {
626 let cutoff = now_ms.saturating_sub(24 * 60 * 60 * 1000);
627 state.recent_fingerprints.retain(|_, seen| *seen >= cutoff);
628 while state.recent_fingerprints.len() > 500 {
629 let Some(first) = state.recent_fingerprints.keys().next().cloned() else {
630 break;
631 };
632 state.recent_fingerprints.remove(&first);
633 }
634}
635
636fn merge_evidence_refs(existing: &mut Vec<String>, incoming: &[String]) {
637 for evidence_ref in incoming {
638 if !existing.iter().any(|row| row == evidence_ref) {
639 existing.push(evidence_ref.clone());
640 }
641 }
642 if existing.len() > 50 {
643 let keep_from = existing.len() - 50;
644 existing.drain(0..keep_from);
645 }
646}
647
648fn merge_payload(existing: Option<Value>, incoming: Value) -> Value {
649 let mut base = existing.unwrap_or_else(|| json!({}));
650 if let (Some(base), Some(incoming)) = (base.as_object_mut(), incoming.as_object()) {
651 for (key, value) in incoming {
652 base.insert(key.clone(), value.clone());
653 }
654 }
655 base
656}
657
658fn nearest_existing_path(path: &Path) -> Option<PathBuf> {
659 let mut current = path.to_path_buf();
660 loop {
661 if current.exists() {
662 return Some(current);
663 }
664 if !current.pop() {
665 return None;
666 }
667 }
668}
669
670#[cfg(unix)]
671fn inode_for_metadata(metadata: &std::fs::Metadata) -> Option<String> {
672 use std::os::unix::fs::MetadataExt;
673 Some(metadata.ino().to_string())
674}
675
676#[cfg(not(unix))]
677fn inode_for_metadata(_metadata: &std::fs::Metadata) -> Option<String> {
678 None
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684 use crate::BugMonitorConfig;
685 use tempfile::tempdir;
686 use tokio::fs;
687
688 fn project(root: &Path) -> BugMonitorMonitoredProject {
689 BugMonitorMonitoredProject {
690 project_id: "customer-api".to_string(),
691 name: "Customer API".to_string(),
692 enabled: true,
693 repo: "owner/customer-api".to_string(),
694 workspace_root: root.display().to_string(),
695 auto_create_new_issues: false,
696 log_sources: vec![source()],
697 ..BugMonitorMonitoredProject::default()
698 }
699 }
700
701 fn source() -> BugMonitorLogSource {
702 BugMonitorLogSource {
703 source_id: "api-log".to_string(),
704 path: "logs/app.log".to_string(),
705 start_position: BugMonitorLogStartPosition::End,
706 ..BugMonitorLogSource::default()
707 }
708 }
709
710 fn test_state(root: &Path) -> AppState {
711 let mut state = AppState::new_starting("test".to_string(), true);
712 state.bug_monitor_config_path = root.join("config.json");
713 state.bug_monitor_drafts_path = root.join("drafts.json");
714 state.bug_monitor_incidents_path = root.join("incidents.json");
715 state.bug_monitor_posts_path = root.join("posts.json");
716 state.bug_monitor_log_watcher_state_path = root.join("log-watcher-state.json");
717 state.bug_monitor_log_evidence_dir = root.join("evidence");
718 state
719 }
720
721 #[test]
722 fn resolve_log_source_path_rejects_escape() {
723 let dir = tempdir().unwrap();
724 let outside = tempdir().unwrap();
725 let mut source = source();
726 source.path = outside.path().join("app.log").display().to_string();
727 let err = resolve_log_source_path(&project(dir.path()), &source).unwrap_err();
728 assert!(err.to_string().contains("escapes workspace_root"));
729 }
730
731 #[tokio::test]
732 async fn poll_from_end_then_append_creates_draft_and_incident() {
733 let dir = tempdir().unwrap();
734 let state_dir = tempdir().unwrap();
735 let logs = dir.path().join("logs");
736 fs::create_dir_all(&logs).await.unwrap();
737 let log_path = logs.join("app.log");
738 fs::write(&log_path, "INFO booted\n").await.unwrap();
739
740 let state = test_state(state_dir.path());
741 let project = project(dir.path());
742 state
743 .put_bug_monitor_config(BugMonitorConfig {
744 enabled: true,
745 monitored_projects: vec![project.clone()],
746 ..BugMonitorConfig::default()
747 })
748 .await
749 .unwrap();
750
751 let first = poll_log_source_once(&state, &project, &source(), 1_000)
752 .await
753 .unwrap();
754 assert_eq!(first.offset, "INFO booted\n".len() as u64);
755 assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 0);
756
757 fs::write(
758 &log_path,
759 "INFO booted\nERROR upload failed\n at normalize src/uploads.ts:42:1\n",
760 )
761 .await
762 .unwrap();
763 let second = poll_log_source_once(&state, &project, &source(), 2_000)
764 .await
765 .unwrap();
766 assert_eq!(second.total_submitted, 1);
767 let drafts = state.list_bug_monitor_drafts(10).await;
768 let incidents = state.list_bug_monitor_incidents(10).await;
769 assert_eq!(drafts.len(), 1);
770 assert_eq!(incidents.len(), 1);
771 assert_eq!(incidents[0].repo, "owner/customer-api");
772 assert_eq!(
773 incidents[0].workspace_root,
774 dir.path().display().to_string()
775 );
776 }
777
778 #[tokio::test]
779 async fn reset_offset_replays_from_beginning_for_end_source() {
780 let dir = tempdir().unwrap();
781 let state_dir = tempdir().unwrap();
782 let logs = dir.path().join("logs");
783 fs::create_dir_all(&logs).await.unwrap();
784 let log_path = logs.join("app.log");
785 fs::write(
786 &log_path,
787 "ERROR boot failed\n at boot src/main.ts:1:1\n",
788 )
789 .await
790 .unwrap();
791
792 let state = test_state(state_dir.path());
793 let project = project(dir.path());
794 state
795 .put_bug_monitor_config(BugMonitorConfig {
796 enabled: true,
797 monitored_projects: vec![project.clone()],
798 ..BugMonitorConfig::default()
799 })
800 .await
801 .unwrap();
802
803 let first = poll_log_source_once(&state, &project, &source(), 1_000)
804 .await
805 .unwrap();
806 assert_eq!(
807 first.offset,
808 "ERROR boot failed\n at boot src/main.ts:1:1\n".len() as u64
809 );
810 assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 0);
811
812 let reset = reset_log_source_offset(&state, &project, &source(), 2_000)
813 .await
814 .unwrap();
815 assert_eq!(reset.offset, 0);
816 assert!(reset.inode.is_some());
817
818 let second = poll_log_source_once(&state, &project, &source(), 3_000)
819 .await
820 .unwrap();
821 assert_eq!(second.total_submitted, 1);
822 assert_eq!(state.list_bug_monitor_drafts(10).await.len(), 1);
823 assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 1);
824 }
825
826 #[tokio::test]
827 async fn replay_latest_candidate_uses_stored_log_offsets() {
828 let dir = tempdir().unwrap();
829 let state_dir = tempdir().unwrap();
830 let logs = dir.path().join("logs");
831 fs::create_dir_all(&logs).await.unwrap();
832 let log_path = logs.join("app.log");
833 fs::write(
834 &log_path,
835 "ERROR upload failed\n at normalize src/uploads.ts:42:1\n",
836 )
837 .await
838 .unwrap();
839
840 let state = test_state(state_dir.path());
841 let project = project(dir.path());
842 let mut replay_source = source();
843 replay_source.start_position = BugMonitorLogStartPosition::Beginning;
844 state
845 .put_bug_monitor_config(BugMonitorConfig {
846 enabled: true,
847 monitored_projects: vec![BugMonitorMonitoredProject {
848 log_sources: vec![replay_source.clone()],
849 ..project.clone()
850 }],
851 ..BugMonitorConfig::default()
852 })
853 .await
854 .unwrap();
855
856 let first = poll_log_source_once(&state, &project, &replay_source, 1_000)
857 .await
858 .unwrap();
859 assert_eq!(first.total_submitted, 1);
860 let before = state.list_bug_monitor_incidents(10).await;
861 assert_eq!(before.len(), 1);
862
863 let replay = replay_latest_log_source_candidate(&state, &project, &replay_source)
864 .await
865 .unwrap()
866 .expect("replayable latest candidate");
867 assert_eq!(replay.incident.incident_id, before[0].incident_id);
868 assert!(replay.draft.is_some());
869 let after = state.list_bug_monitor_incidents(10).await;
870 assert_eq!(after.len(), 1);
871 assert_eq!(after[0].occurrence_count, 2);
872 }
873}