Skip to main content

tandem_server/bug_monitor/
log_watcher.rs

1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use anyhow::Context;
5use serde_json::{json, Value};
6use tokio::fs;
7use tokio::io::{AsyncReadExt, AsyncSeekExt};
8
9use crate::{
10    AppState, BugMonitorDraftRecord, BugMonitorIncidentRecord, BugMonitorLogCandidate,
11    BugMonitorLogSource, BugMonitorLogSourceRuntimeStatus, BugMonitorLogSourceState,
12    BugMonitorLogStartPosition, BugMonitorLogWatcherStatus, BugMonitorMonitoredProject,
13    BugMonitorSubmission,
14};
15
16#[derive(Debug, Clone)]
17pub struct BugMonitorLogReplayResult {
18    pub incident: BugMonitorIncidentRecord,
19    pub draft: Option<BugMonitorDraftRecord>,
20}
21
22pub async fn run_bug_monitor_log_watcher(state: AppState) {
23    state
24        .update_bug_monitor_log_watcher_status(|status| {
25            status.running = true;
26            status.last_error = None;
27        })
28        .await;
29    loop {
30        let now_ms = crate::now_ms();
31        if let Err(error) = poll_enabled_sources(&state, now_ms).await {
32            state
33                .update_bug_monitor_log_watcher_status(|status| {
34                    status.running = true;
35                    status.last_error = Some(error.to_string());
36                })
37                .await;
38        }
39        tokio::time::sleep(Duration::from_secs(1)).await;
40    }
41}
42
43async fn poll_enabled_sources(state: &AppState, now_ms: u64) -> anyhow::Result<()> {
44    let config = state.bug_monitor_config().await;
45    let enabled_projects = config
46        .monitored_projects
47        .iter()
48        .filter(|project| project.enabled && !project.paused)
49        .count();
50    let enabled_sources = config
51        .monitored_projects
52        .iter()
53        .filter(|project| project.enabled && !project.paused)
54        .flat_map(|project| project.log_sources.iter())
55        .filter(|source| source.enabled && !source.paused)
56        .count();
57    if !config.enabled || config.paused {
58        state
59            .update_bug_monitor_log_watcher_status(|status| {
60                status.running = true;
61                status.enabled_projects = enabled_projects;
62                status.enabled_sources = enabled_sources;
63                status.last_poll_at_ms = Some(now_ms);
64            })
65            .await;
66        return Ok(());
67    }
68
69    let previous_status = state.bug_monitor_log_watcher_status.read().await.clone();
70    let mut source_statuses = previous_status.sources;
71    for project in config
72        .monitored_projects
73        .iter()
74        .filter(|project| project.enabled && !project.paused)
75    {
76        for source in project
77            .log_sources
78            .iter()
79            .filter(|source| source.enabled && !source.paused)
80        {
81            let key = format!("{}/{}", project.project_id, source.source_id);
82            let last_poll_at = source_statuses
83                .iter()
84                .find(|row| format!("{}/{}", row.project_id, row.source_id) == key)
85                .and_then(|row| row.last_poll_at_ms)
86                .unwrap_or(0);
87            if now_ms < last_poll_at.saturating_add(source.watch_interval_seconds * 1000) {
88                continue;
89            }
90            let status = poll_log_source_once(state, project, source, now_ms).await?;
91            source_statuses.retain(|row| {
92                !(row.project_id == status.project_id && row.source_id == status.source_id)
93            });
94            source_statuses.push(status);
95        }
96    }
97    state
98        .update_bug_monitor_log_watcher_status(|status| {
99            status.running = true;
100            status.enabled_projects = enabled_projects;
101            status.enabled_sources = enabled_sources;
102            status.last_poll_at_ms = Some(now_ms);
103            status.last_error = None;
104            status.sources = source_statuses;
105        })
106        .await;
107    Ok(())
108}
109
110pub async fn poll_log_source_once(
111    state: &AppState,
112    project: &BugMonitorMonitoredProject,
113    source: &BugMonitorLogSource,
114    now_ms: u64,
115) -> anyhow::Result<BugMonitorLogSourceRuntimeStatus> {
116    let absolute_path = resolve_log_source_path(project, source)?;
117    let mut source_state = state
118        .get_bug_monitor_log_source_state(&project.project_id, &source.source_id)
119        .await
120        .unwrap_or_else(|| BugMonitorLogSourceState {
121            project_id: project.project_id.clone(),
122            source_id: source.source_id.clone(),
123            path: absolute_path.display().to_string(),
124            updated_at_ms: now_ms,
125            ..BugMonitorLogSourceState::default()
126        });
127    source_state.path = absolute_path.display().to_string();
128
129    let metadata = match fs::metadata(&absolute_path).await {
130        Ok(metadata) => metadata,
131        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
132            source_state.last_error = Some("log file not found".to_string());
133            source_state.consecutive_errors = source_state.consecutive_errors.saturating_add(1);
134            source_state.updated_at_ms = now_ms;
135            state
136                .put_bug_monitor_log_source_state(source_state.clone())
137                .await?;
138            return Ok(status_from_state(
139                &source_state,
140                false,
141                None,
142                Some(now_ms),
143                None,
144                None,
145            ));
146        }
147        Err(error) => return Err(error).context("failed to stat log file"),
148    };
149    let file_size = metadata.len();
150    let inode = inode_for_metadata(&metadata);
151    if source_state.offset == 0
152        && source_state.inode.is_none()
153        && matches!(source.start_position, BugMonitorLogStartPosition::End)
154    {
155        source_state.offset = file_size;
156        source_state.inode = inode.clone();
157        source_state.updated_at_ms = now_ms;
158        source_state.last_error = None;
159        source_state.consecutive_errors = 0;
160        state
161            .put_bug_monitor_log_source_state(source_state.clone())
162            .await?;
163        return Ok(status_from_state(
164            &source_state,
165            true,
166            Some(file_size),
167            Some(now_ms),
168            None,
169            None,
170        ));
171    }
172    if source_state
173        .inode
174        .as_ref()
175        .zip(inode.as_ref())
176        .is_some_and(|(a, b)| a != b)
177        || file_size < source_state.offset
178    {
179        source_state.offset = 0;
180        source_state.partial_line = None;
181        source_state.partial_line_offset_start = None;
182    }
183    source_state.inode = inode.clone();
184
185    let read_len = file_size
186        .saturating_sub(source_state.offset)
187        .min(source.max_bytes_per_poll);
188    if read_len == 0 {
189        source_state.updated_at_ms = now_ms;
190        source_state.last_error = None;
191        source_state.consecutive_errors = 0;
192        state
193            .put_bug_monitor_log_source_state(source_state.clone())
194            .await?;
195        return Ok(status_from_state(
196            &source_state,
197            true,
198            Some(file_size),
199            Some(now_ms),
200            None,
201            None,
202        ));
203    }
204
205    let mut file = fs::File::open(&absolute_path).await?;
206    file.seek(std::io::SeekFrom::Start(source_state.offset))
207        .await?;
208    let mut bytes = vec![0; read_len as usize];
209    file.read_exact(&mut bytes).await?;
210    let parse_result = crate::bug_monitor::log_parser::parse_log_candidates(
211        project,
212        source,
213        &absolute_path,
214        inode.clone(),
215        source_state.offset,
216        &bytes,
217        source_state.partial_line.clone(),
218        source_state.partial_line_offset_start,
219    );
220    source_state.offset = source_state.offset.saturating_add(read_len);
221    source_state.partial_line = parse_result.next_partial_line;
222    source_state.partial_line_offset_start = parse_result.next_partial_line_offset_start;
223    source_state.total_bytes_read = source_state.total_bytes_read.saturating_add(read_len);
224    source_state.total_candidates = source_state
225        .total_candidates
226        .saturating_add(parse_result.candidates.len() as u64);
227    source_state.updated_at_ms = now_ms;
228    source_state.last_error = None;
229    source_state.consecutive_errors = 0;
230
231    let mut submitted = 0usize;
232    let mut last_candidate_at_ms = None;
233    let mut last_submitted_at_ms = None;
234    for candidate in parse_result
235        .candidates
236        .into_iter()
237        .take(source.max_candidates_per_poll)
238    {
239        last_candidate_at_ms = Some(now_ms);
240        if fingerprint_recent(
241            &mut source_state,
242            &candidate.fingerprint,
243            source.fingerprint_cooldown_ms,
244            now_ms,
245        ) {
246            continue;
247        }
248        if submit_log_candidate(state, project, source, candidate)
249            .await?
250            .is_some()
251        {
252            submitted += 1;
253            last_submitted_at_ms = Some(now_ms);
254        }
255    }
256    source_state.total_submitted = source_state
257        .total_submitted
258        .saturating_add(submitted as u64);
259    prune_recent_fingerprints(&mut source_state, now_ms);
260    state
261        .put_bug_monitor_log_source_state(source_state.clone())
262        .await?;
263    Ok(status_from_state(
264        &source_state,
265        true,
266        Some(file_size),
267        Some(now_ms),
268        last_candidate_at_ms,
269        last_submitted_at_ms,
270    ))
271}
272
273pub async fn submit_log_candidate(
274    state: &AppState,
275    project: &BugMonitorMonitoredProject,
276    source: &BugMonitorLogSource,
277    mut candidate: BugMonitorLogCandidate,
278) -> anyhow::Result<Option<BugMonitorDraftRecord>> {
279    let evidence_ref =
280        crate::bug_monitor::log_artifacts::write_log_evidence_artifact(state, &candidate).await?;
281    if !candidate
282        .evidence_refs
283        .iter()
284        .any(|row| row == &evidence_ref)
285    {
286        candidate.evidence_refs.push(evidence_ref.clone());
287    }
288    let now = crate::now_ms();
289    let event_payload = json!({
290        "project_id": project.project_id,
291        "source_id": source.source_id,
292        "log_path": candidate.path,
293        "offset_start": candidate.offset_start,
294        "offset_end": candidate.offset_end,
295        "inode": candidate.inode,
296        "evidence_artifact": evidence_ref,
297        "detected_at_ms": now,
298        "mcp_server": project.mcp_server,
299        "model_policy": project.model_policy,
300    });
301    let mut incident = latest_bug_monitor_incident_by_repo_fingerprint(
302        state,
303        &project.repo,
304        &candidate.fingerprint,
305    )
306    .await
307    .unwrap_or_else(|| BugMonitorIncidentRecord {
308        incident_id: format!("failure-incident-{}", uuid::Uuid::new_v4().simple()),
309        fingerprint: candidate.fingerprint.clone(),
310        event_type: candidate.event.clone(),
311        status: "queued".to_string(),
312        repo: project.repo.clone(),
313        workspace_root: project.workspace_root.clone(),
314        title: candidate.title.clone(),
315        detail: Some(candidate.detail.clone()),
316        excerpt: candidate.excerpt.clone(),
317        source: Some(candidate.source.clone()),
318        component: candidate.component.clone(),
319        level: Some(candidate.level.clone()),
320        occurrence_count: 0,
321        created_at_ms: now,
322        updated_at_ms: now,
323        last_seen_at_ms: Some(now),
324        confidence: Some(candidate.confidence.clone()),
325        risk_level: Some(candidate.risk_level.clone()),
326        expected_destination: Some(candidate.expected_destination.clone()),
327        evidence_refs: candidate.evidence_refs.clone(),
328        event_payload: Some(event_payload.clone()),
329        ..BugMonitorIncidentRecord::default()
330    });
331    incident.occurrence_count = incident.occurrence_count.saturating_add(1).max(1);
332    incident.updated_at_ms = now;
333    incident.last_seen_at_ms = Some(now);
334    if incident.workspace_root.trim().is_empty() {
335        incident.workspace_root = project.workspace_root.clone();
336    }
337    merge_evidence_refs(&mut incident.evidence_refs, &candidate.evidence_refs);
338    incident.event_payload = Some(merge_payload(incident.event_payload.clone(), event_payload));
339
340    if let Some(draft_id) = incident.draft_id.as_deref() {
341        if let Some(draft) = state.get_bug_monitor_draft(draft_id).await {
342            state.put_bug_monitor_incident(incident).await?;
343            return Ok(Some(draft));
344        }
345    }
346
347    state.put_bug_monitor_incident(incident.clone()).await?;
348    let submission = BugMonitorSubmission {
349        project_id: Some(project.project_id.clone()),
350        workspace_root: Some(project.workspace_root.clone()),
351        log_source_id: Some(source.source_id.clone()),
352        repo: Some(project.repo.clone()),
353        title: Some(candidate.title.clone()),
354        detail: Some(candidate.detail.clone()),
355        source: Some(candidate.source.clone()),
356        file_name: Some(candidate.path.clone()),
357        process: candidate.process.clone(),
358        component: candidate.component.clone(),
359        event: Some(candidate.event.clone()),
360        level: Some(candidate.level.clone()),
361        excerpt: candidate.excerpt.clone(),
362        fingerprint: Some(candidate.fingerprint.clone()),
363        confidence: Some(candidate.confidence.clone()),
364        risk_level: Some(candidate.risk_level.clone()),
365        expected_destination: Some(candidate.expected_destination.clone()),
366        evidence_refs: candidate.evidence_refs.clone(),
367        ..BugMonitorSubmission::default()
368    };
369    let mut draft = match state.submit_bug_monitor_draft(submission).await {
370        Ok(draft) => draft,
371        Err(error) => {
372            incident.status = "draft_failed".to_string();
373            incident.last_error = Some(error.to_string());
374            state.put_bug_monitor_incident(incident).await?;
375            return Ok(None);
376        }
377    };
378    if project.require_approval_for_new_issues && draft.status != "approval_required" {
379        draft.status = "approval_required".to_string();
380        draft = state.put_bug_monitor_draft(draft).await?;
381    }
382    incident.draft_id = Some(draft.draft_id.clone());
383    incident.status = "draft_created".to_string();
384    state.put_bug_monitor_incident(incident.clone()).await?;
385    if project.auto_create_new_issues {
386        if let Ok((updated_draft, _run_id, _deduped)) =
387            crate::http::bug_monitor::ensure_bug_monitor_triage_run(
388                state.clone(),
389                &draft.draft_id,
390                true,
391            )
392            .await
393        {
394            draft = updated_draft;
395        }
396    }
397    Ok(Some(draft))
398}
399
400pub async fn reset_log_source_offset(
401    state: &AppState,
402    project: &BugMonitorMonitoredProject,
403    source: &BugMonitorLogSource,
404    now_ms: u64,
405) -> anyhow::Result<BugMonitorLogSourceState> {
406    let absolute_path = resolve_log_source_path(project, source)?;
407    let metadata = fs::metadata(&absolute_path).await.ok();
408    let inode = metadata.as_ref().and_then(inode_for_metadata);
409    let mut source_state = state
410        .get_bug_monitor_log_source_state(&project.project_id, &source.source_id)
411        .await
412        .unwrap_or_else(|| BugMonitorLogSourceState {
413            project_id: project.project_id.clone(),
414            source_id: source.source_id.clone(),
415            ..BugMonitorLogSourceState::default()
416        });
417    source_state.path = absolute_path.display().to_string();
418    source_state.inode = inode;
419    source_state.offset = 0;
420    source_state.partial_line = None;
421    source_state.partial_line_offset_start = None;
422    source_state.last_line_hash = None;
423    source_state.recent_fingerprints.clear();
424    source_state.updated_at_ms = now_ms;
425    source_state.last_error = None;
426    source_state.consecutive_errors = 0;
427    let source_state = state
428        .put_bug_monitor_log_source_state(source_state.clone())
429        .await?;
430    let file_size = metadata.as_ref().map(|metadata| metadata.len());
431    state
432        .update_bug_monitor_log_watcher_status(|status| {
433            status.sources.retain(|row| {
434                !(row.project_id == source_state.project_id
435                    && row.source_id == source_state.source_id)
436            });
437            status.sources.push(status_from_state(
438                &source_state,
439                metadata.is_some(),
440                file_size,
441                None,
442                None,
443                None,
444            ));
445        })
446        .await;
447    Ok(source_state)
448}
449
450pub async fn replay_latest_log_source_candidate(
451    state: &AppState,
452    project: &BugMonitorMonitoredProject,
453    source: &BugMonitorLogSource,
454) -> anyhow::Result<Option<BugMonitorLogReplayResult>> {
455    let Some(incident) =
456        latest_bug_monitor_incident_by_log_source(state, &project.project_id, &source.source_id)
457            .await
458    else {
459        return Ok(None);
460    };
461    let payload = incident
462        .event_payload
463        .as_ref()
464        .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no event payload"))?;
465    let offset_start = payload
466        .get("offset_start")
467        .and_then(Value::as_u64)
468        .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no offset_start"))?;
469    let offset_end = payload
470        .get("offset_end")
471        .and_then(Value::as_u64)
472        .ok_or_else(|| anyhow::anyhow!("latest log-source incident has no offset_end"))?;
473    if offset_end <= offset_start {
474        anyhow::bail!("latest log-source incident has invalid offsets");
475    }
476
477    let absolute_path = resolve_log_source_path(project, source)?;
478    let metadata = fs::metadata(&absolute_path)
479        .await
480        .with_context(|| format!("failed to stat log file {}", absolute_path.display()))?;
481    if metadata.len() < offset_end {
482        anyhow::bail!("latest log-source incident offsets exceed current log file size");
483    }
484    let inode = inode_for_metadata(&metadata);
485    let mut file = fs::File::open(&absolute_path).await?;
486    file.seek(std::io::SeekFrom::Start(offset_start)).await?;
487    let read_len = offset_end.saturating_sub(offset_start);
488    let mut bytes = vec![0; read_len as usize];
489    file.read_exact(&mut bytes).await?;
490    let parse_result = crate::bug_monitor::log_parser::parse_log_candidates(
491        project,
492        source,
493        &absolute_path,
494        inode,
495        offset_start,
496        &bytes,
497        None,
498        None,
499    );
500    let Some(candidate) = parse_result.candidates.into_iter().find(|candidate| {
501        candidate.offset_start == offset_start && candidate.offset_end == offset_end
502    }) else {
503        anyhow::bail!("latest log-source incident offsets no longer parse to a candidate");
504    };
505    let draft = submit_log_candidate(state, project, source, candidate).await?;
506    Ok(Some(BugMonitorLogReplayResult { incident, draft }))
507}
508
509pub fn resolve_log_source_path(
510    project: &BugMonitorMonitoredProject,
511    source: &BugMonitorLogSource,
512) -> anyhow::Result<PathBuf> {
513    let workspace_root = PathBuf::from(&project.workspace_root);
514    if !workspace_root.is_absolute() {
515        anyhow::bail!("workspace_root must be absolute");
516    }
517    let workspace_canonical = nearest_existing_path(&workspace_root)
518        .and_then(|path| path.canonicalize().ok())
519        .unwrap_or(workspace_root.clone());
520    let raw_path = PathBuf::from(&source.path);
521    let candidate = if raw_path.is_absolute() {
522        raw_path
523    } else {
524        workspace_root.join(raw_path)
525    };
526    let nearest = nearest_existing_path(&candidate)
527        .ok_or_else(|| anyhow::anyhow!("log source path has no existing parent"))?;
528    let nearest_canonical = nearest.canonicalize()?;
529    if !nearest_canonical.starts_with(&workspace_canonical) {
530        anyhow::bail!("log source path escapes workspace_root");
531    }
532    Ok(candidate)
533}
534
535async fn latest_bug_monitor_incident_by_repo_fingerprint(
536    state: &AppState,
537    repo: &str,
538    fingerprint: &str,
539) -> Option<BugMonitorIncidentRecord> {
540    state
541        .bug_monitor_incidents
542        .read()
543        .await
544        .values()
545        .filter(|row| row.repo == repo && row.fingerprint == fingerprint)
546        .max_by_key(|row| row.updated_at_ms)
547        .cloned()
548}
549
550async fn latest_bug_monitor_incident_by_log_source(
551    state: &AppState,
552    project_id: &str,
553    source_id: &str,
554) -> Option<BugMonitorIncidentRecord> {
555    state
556        .bug_monitor_incidents
557        .read()
558        .await
559        .values()
560        .filter(|row| {
561            row.event_payload.as_ref().is_some_and(|payload| {
562                payload
563                    .get("project_id")
564                    .and_then(Value::as_str)
565                    .is_some_and(|value| value == project_id)
566                    && payload
567                        .get("source_id")
568                        .and_then(Value::as_str)
569                        .is_some_and(|value| value == source_id)
570                    && payload
571                        .get("offset_start")
572                        .and_then(Value::as_u64)
573                        .is_some()
574                    && payload.get("offset_end").and_then(Value::as_u64).is_some()
575            })
576        })
577        .max_by_key(|row| row.updated_at_ms)
578        .cloned()
579}
580
581fn status_from_state(
582    state: &BugMonitorLogSourceState,
583    healthy: bool,
584    file_size: Option<u64>,
585    last_poll_at_ms: Option<u64>,
586    last_candidate_at_ms: Option<u64>,
587    last_submitted_at_ms: Option<u64>,
588) -> BugMonitorLogSourceRuntimeStatus {
589    BugMonitorLogSourceRuntimeStatus {
590        project_id: state.project_id.clone(),
591        source_id: state.source_id.clone(),
592        path: state.path.clone(),
593        healthy,
594        offset: state.offset,
595        inode: state.inode.clone(),
596        file_size,
597        last_poll_at_ms,
598        last_candidate_at_ms,
599        last_submitted_at_ms,
600        last_error: state.last_error.clone(),
601        consecutive_errors: state.consecutive_errors,
602        total_bytes_read: state.total_bytes_read,
603        total_candidates: state.total_candidates,
604        total_submitted: state.total_submitted,
605    }
606}
607
608fn fingerprint_recent(
609    state: &mut BugMonitorLogSourceState,
610    fingerprint: &str,
611    cooldown_ms: u64,
612    now_ms: u64,
613) -> bool {
614    let recent = state
615        .recent_fingerprints
616        .get(fingerprint)
617        .map(|seen| now_ms.saturating_sub(*seen) < cooldown_ms)
618        .unwrap_or(false);
619    state
620        .recent_fingerprints
621        .insert(fingerprint.to_string(), now_ms);
622    recent
623}
624
625fn prune_recent_fingerprints(state: &mut BugMonitorLogSourceState, now_ms: u64) {
626    let cutoff = now_ms.saturating_sub(24 * 60 * 60 * 1000);
627    state.recent_fingerprints.retain(|_, seen| *seen >= cutoff);
628    while state.recent_fingerprints.len() > 500 {
629        let Some(first) = state.recent_fingerprints.keys().next().cloned() else {
630            break;
631        };
632        state.recent_fingerprints.remove(&first);
633    }
634}
635
636fn merge_evidence_refs(existing: &mut Vec<String>, incoming: &[String]) {
637    for evidence_ref in incoming {
638        if !existing.iter().any(|row| row == evidence_ref) {
639            existing.push(evidence_ref.clone());
640        }
641    }
642    if existing.len() > 50 {
643        let keep_from = existing.len() - 50;
644        existing.drain(0..keep_from);
645    }
646}
647
648fn merge_payload(existing: Option<Value>, incoming: Value) -> Value {
649    let mut base = existing.unwrap_or_else(|| json!({}));
650    if let (Some(base), Some(incoming)) = (base.as_object_mut(), incoming.as_object()) {
651        for (key, value) in incoming {
652            base.insert(key.clone(), value.clone());
653        }
654    }
655    base
656}
657
658fn nearest_existing_path(path: &Path) -> Option<PathBuf> {
659    let mut current = path.to_path_buf();
660    loop {
661        if current.exists() {
662            return Some(current);
663        }
664        if !current.pop() {
665            return None;
666        }
667    }
668}
669
670#[cfg(unix)]
671fn inode_for_metadata(metadata: &std::fs::Metadata) -> Option<String> {
672    use std::os::unix::fs::MetadataExt;
673    Some(metadata.ino().to_string())
674}
675
676#[cfg(not(unix))]
677fn inode_for_metadata(_metadata: &std::fs::Metadata) -> Option<String> {
678    None
679}
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684    use crate::BugMonitorConfig;
685    use tempfile::tempdir;
686    use tokio::fs;
687
688    fn project(root: &Path) -> BugMonitorMonitoredProject {
689        BugMonitorMonitoredProject {
690            project_id: "customer-api".to_string(),
691            name: "Customer API".to_string(),
692            enabled: true,
693            repo: "owner/customer-api".to_string(),
694            workspace_root: root.display().to_string(),
695            auto_create_new_issues: false,
696            log_sources: vec![source()],
697            ..BugMonitorMonitoredProject::default()
698        }
699    }
700
701    fn source() -> BugMonitorLogSource {
702        BugMonitorLogSource {
703            source_id: "api-log".to_string(),
704            path: "logs/app.log".to_string(),
705            start_position: BugMonitorLogStartPosition::End,
706            ..BugMonitorLogSource::default()
707        }
708    }
709
710    fn test_state(root: &Path) -> AppState {
711        let mut state = AppState::new_starting("test".to_string(), true);
712        state.bug_monitor_config_path = root.join("config.json");
713        state.bug_monitor_drafts_path = root.join("drafts.json");
714        state.bug_monitor_incidents_path = root.join("incidents.json");
715        state.bug_monitor_posts_path = root.join("posts.json");
716        state.bug_monitor_log_watcher_state_path = root.join("log-watcher-state.json");
717        state.bug_monitor_log_evidence_dir = root.join("evidence");
718        state
719    }
720
721    #[test]
722    fn resolve_log_source_path_rejects_escape() {
723        let dir = tempdir().unwrap();
724        let outside = tempdir().unwrap();
725        let mut source = source();
726        source.path = outside.path().join("app.log").display().to_string();
727        let err = resolve_log_source_path(&project(dir.path()), &source).unwrap_err();
728        assert!(err.to_string().contains("escapes workspace_root"));
729    }
730
731    #[tokio::test]
732    async fn poll_from_end_then_append_creates_draft_and_incident() {
733        let dir = tempdir().unwrap();
734        let state_dir = tempdir().unwrap();
735        let logs = dir.path().join("logs");
736        fs::create_dir_all(&logs).await.unwrap();
737        let log_path = logs.join("app.log");
738        fs::write(&log_path, "INFO booted\n").await.unwrap();
739
740        let state = test_state(state_dir.path());
741        let project = project(dir.path());
742        state
743            .put_bug_monitor_config(BugMonitorConfig {
744                enabled: true,
745                monitored_projects: vec![project.clone()],
746                ..BugMonitorConfig::default()
747            })
748            .await
749            .unwrap();
750
751        let first = poll_log_source_once(&state, &project, &source(), 1_000)
752            .await
753            .unwrap();
754        assert_eq!(first.offset, "INFO booted\n".len() as u64);
755        assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 0);
756
757        fs::write(
758            &log_path,
759            "INFO booted\nERROR upload failed\n    at normalize src/uploads.ts:42:1\n",
760        )
761        .await
762        .unwrap();
763        let second = poll_log_source_once(&state, &project, &source(), 2_000)
764            .await
765            .unwrap();
766        assert_eq!(second.total_submitted, 1);
767        let drafts = state.list_bug_monitor_drafts(10).await;
768        let incidents = state.list_bug_monitor_incidents(10).await;
769        assert_eq!(drafts.len(), 1);
770        assert_eq!(incidents.len(), 1);
771        assert_eq!(incidents[0].repo, "owner/customer-api");
772        assert_eq!(
773            incidents[0].workspace_root,
774            dir.path().display().to_string()
775        );
776    }
777
778    #[tokio::test]
779    async fn reset_offset_replays_from_beginning_for_end_source() {
780        let dir = tempdir().unwrap();
781        let state_dir = tempdir().unwrap();
782        let logs = dir.path().join("logs");
783        fs::create_dir_all(&logs).await.unwrap();
784        let log_path = logs.join("app.log");
785        fs::write(
786            &log_path,
787            "ERROR boot failed\n    at boot src/main.ts:1:1\n",
788        )
789        .await
790        .unwrap();
791
792        let state = test_state(state_dir.path());
793        let project = project(dir.path());
794        state
795            .put_bug_monitor_config(BugMonitorConfig {
796                enabled: true,
797                monitored_projects: vec![project.clone()],
798                ..BugMonitorConfig::default()
799            })
800            .await
801            .unwrap();
802
803        let first = poll_log_source_once(&state, &project, &source(), 1_000)
804            .await
805            .unwrap();
806        assert_eq!(
807            first.offset,
808            "ERROR boot failed\n    at boot src/main.ts:1:1\n".len() as u64
809        );
810        assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 0);
811
812        let reset = reset_log_source_offset(&state, &project, &source(), 2_000)
813            .await
814            .unwrap();
815        assert_eq!(reset.offset, 0);
816        assert!(reset.inode.is_some());
817
818        let second = poll_log_source_once(&state, &project, &source(), 3_000)
819            .await
820            .unwrap();
821        assert_eq!(second.total_submitted, 1);
822        assert_eq!(state.list_bug_monitor_drafts(10).await.len(), 1);
823        assert_eq!(state.list_bug_monitor_incidents(10).await.len(), 1);
824    }
825
826    #[tokio::test]
827    async fn replay_latest_candidate_uses_stored_log_offsets() {
828        let dir = tempdir().unwrap();
829        let state_dir = tempdir().unwrap();
830        let logs = dir.path().join("logs");
831        fs::create_dir_all(&logs).await.unwrap();
832        let log_path = logs.join("app.log");
833        fs::write(
834            &log_path,
835            "ERROR upload failed\n    at normalize src/uploads.ts:42:1\n",
836        )
837        .await
838        .unwrap();
839
840        let state = test_state(state_dir.path());
841        let project = project(dir.path());
842        let mut replay_source = source();
843        replay_source.start_position = BugMonitorLogStartPosition::Beginning;
844        state
845            .put_bug_monitor_config(BugMonitorConfig {
846                enabled: true,
847                monitored_projects: vec![BugMonitorMonitoredProject {
848                    log_sources: vec![replay_source.clone()],
849                    ..project.clone()
850                }],
851                ..BugMonitorConfig::default()
852            })
853            .await
854            .unwrap();
855
856        let first = poll_log_source_once(&state, &project, &replay_source, 1_000)
857            .await
858            .unwrap();
859        assert_eq!(first.total_submitted, 1);
860        let before = state.list_bug_monitor_incidents(10).await;
861        assert_eq!(before.len(), 1);
862
863        let replay = replay_latest_log_source_candidate(&state, &project, &replay_source)
864            .await
865            .unwrap()
866            .expect("replayable latest candidate");
867        assert_eq!(replay.incident.incident_id, before[0].incident_id);
868        assert!(replay.draft.is_some());
869        let after = state.list_bug_monitor_incidents(10).await;
870        assert_eq!(after.len(), 1);
871        assert_eq!(after[0].occurrence_count, 2);
872    }
873}