Skip to main content

tandem_server/bug_monitor/
log_parser.rs

1use std::path::Path;
2
3use serde_json::Value;
4
5use crate::{
6    BugMonitorLogCandidate, BugMonitorLogFormat, BugMonitorLogMinimumLevel, BugMonitorLogSource,
7    BugMonitorMonitoredProject,
8};
9
10#[derive(Debug, Clone, Default)]
11pub struct BugMonitorLogParseResult {
12    pub candidates: Vec<BugMonitorLogCandidate>,
13    pub next_partial_line: Option<String>,
14    pub next_partial_line_offset_start: Option<u64>,
15}
16
17#[derive(Debug, Clone)]
18struct ParsedLine {
19    text: String,
20    offset_start: u64,
21    offset_end: u64,
22}
23
24pub fn parse_log_candidates(
25    project: &BugMonitorMonitoredProject,
26    source: &BugMonitorLogSource,
27    absolute_path: &Path,
28    inode: Option<String>,
29    offset_start: u64,
30    bytes: &[u8],
31    partial_line: Option<String>,
32    partial_line_offset_start: Option<u64>,
33) -> BugMonitorLogParseResult {
34    let decoded = String::from_utf8_lossy(bytes);
35    let mut combined = String::new();
36    if let Some(partial) = partial_line.as_ref() {
37        combined.push_str(partial);
38    }
39    combined.push_str(&decoded);
40    let mut next_partial_line = None;
41    let mut next_partial_line_offset_start = None;
42    let complete_text = if combined.ends_with('\n') || combined.ends_with('\r') {
43        combined
44    } else if let Some((complete, partial)) = combined.rsplit_once('\n') {
45        let complete_with_newline = format!("{complete}\n");
46        let partial_offset = offset_start
47            .saturating_add(bytes.len() as u64)
48            .saturating_sub(partial.as_bytes().len() as u64);
49        next_partial_line = Some(partial.to_string());
50        next_partial_line_offset_start = Some(partial_offset);
51        complete_with_newline
52    } else {
53        let start = partial_line_offset_start.unwrap_or(offset_start);
54        next_partial_line = Some(combined);
55        next_partial_line_offset_start = Some(start);
56        String::new()
57    };
58
59    let base_offset = partial_line_offset_start.unwrap_or(offset_start);
60    let mut cursor = base_offset;
61    let lines = complete_text
62        .split_inclusive('\n')
63        .map(|line| {
64            let clean = line.trim_end_matches(['\r', '\n']).to_string();
65            let start = cursor;
66            cursor = cursor.saturating_add(line.as_bytes().len() as u64);
67            ParsedLine {
68                text: clean,
69                offset_start: start,
70                offset_end: cursor,
71            }
72        })
73        .collect::<Vec<_>>();
74
75    let candidates = match source.format {
76        BugMonitorLogFormat::Json => {
77            parse_json_lines(project, source, absolute_path, inode, &lines)
78        }
79        BugMonitorLogFormat::Plaintext => {
80            parse_plaintext(project, source, absolute_path, inode, &lines)
81        }
82        BugMonitorLogFormat::Auto => {
83            let mut out = parse_json_lines(project, source, absolute_path, inode.clone(), &lines);
84            out.extend(parse_plaintext(
85                project,
86                source,
87                absolute_path,
88                inode,
89                &lines
90                    .into_iter()
91                    .filter(|line| serde_json::from_str::<Value>(&line.text).is_err())
92                    .collect::<Vec<_>>(),
93            ));
94            out
95        }
96    };
97
98    BugMonitorLogParseResult {
99        candidates,
100        next_partial_line,
101        next_partial_line_offset_start,
102    }
103}
104
105fn parse_json_lines(
106    project: &BugMonitorMonitoredProject,
107    source: &BugMonitorLogSource,
108    absolute_path: &Path,
109    inode: Option<String>,
110    lines: &[ParsedLine],
111) -> Vec<BugMonitorLogCandidate> {
112    lines
113        .iter()
114        .filter_map(|line| {
115            let value = serde_json::from_str::<Value>(&line.text).ok()?;
116            let level = first_json_string(&value, &["level", "severity", "log.level", "lvl"])
117                .unwrap_or_else(|| "info".to_string());
118            if !level_allowed(&level, &source.minimum_level) {
119                return None;
120            }
121            let message =
122                first_json_string(&value, &["message", "msg", "error", "exception.message"])
123                    .unwrap_or_else(|| line.text.clone());
124            let event = first_json_string(
125                &value,
126                &["event", "event.name", "error.kind", "exception.type"],
127            )
128            .unwrap_or_else(|| "log.error".to_string());
129            let component = first_json_string(
130                &value,
131                &["component", "service", "logger", "target", "module"],
132            );
133            let process = first_json_string(&value, &["process", "process.name", "service"]);
134            let stack = first_json_string(
135                &value,
136                &["stack", "stacktrace", "exception.stacktrace", "error.stack"],
137            );
138            let mut excerpt = vec![message.clone()];
139            if let Some(stack) = stack {
140                excerpt.extend(stack.lines().take(20).map(ToString::to_string));
141            }
142            Some(candidate_from_block(
143                project,
144                source,
145                absolute_path,
146                inode.clone(),
147                line.offset_start,
148                line.offset_end,
149                event,
150                level,
151                component,
152                process,
153                excerpt,
154            ))
155        })
156        .collect()
157}
158
159fn parse_plaintext(
160    project: &BugMonitorMonitoredProject,
161    source: &BugMonitorLogSource,
162    absolute_path: &Path,
163    inode: Option<String>,
164    lines: &[ParsedLine],
165) -> Vec<BugMonitorLogCandidate> {
166    let mut out = Vec::new();
167    let mut index = 0usize;
168    while index < lines.len() {
169        let line = &lines[index];
170        let Some(level) = detect_level(&line.text) else {
171            index += 1;
172            continue;
173        };
174        if !level_allowed(&level, &source.minimum_level) {
175            index += 1;
176            continue;
177        }
178        let start = index.saturating_sub(5);
179        let mut end = index + 1;
180        while end < lines.len() && end - index < 50 {
181            let text = lines[end].text.trim_start();
182            if end > index && is_new_log_line(text) && !is_continuation(text) {
183                break;
184            }
185            if end > index && !is_continuation(text) && detect_level(text).is_some() {
186                break;
187            }
188            end += 1;
189        }
190        let block = &lines[start..end];
191        let excerpt = block.iter().map(|row| row.text.clone()).collect::<Vec<_>>();
192        out.push(candidate_from_block(
193            project,
194            source,
195            absolute_path,
196            inode.clone(),
197            block
198                .first()
199                .map(|row| row.offset_start)
200                .unwrap_or(line.offset_start),
201            block
202                .last()
203                .map(|row| row.offset_end)
204                .unwrap_or(line.offset_end),
205            "log.error".to_string(),
206            level,
207            None,
208            None,
209            excerpt,
210        ));
211        index = end.max(index + 1);
212    }
213    out
214}
215
216fn candidate_from_block(
217    project: &BugMonitorMonitoredProject,
218    source: &BugMonitorLogSource,
219    absolute_path: &Path,
220    inode: Option<String>,
221    offset_start: u64,
222    offset_end: u64,
223    event: String,
224    level: String,
225    component: Option<String>,
226    process: Option<String>,
227    excerpt: Vec<String>,
228) -> BugMonitorLogCandidate {
229    let raw_excerpt_redacted = excerpt
230        .iter()
231        .take(200)
232        .map(|line| redact_text(line))
233        .collect::<Vec<_>>();
234    let excerpt = raw_excerpt_redacted
235        .iter()
236        .take(50)
237        .cloned()
238        .collect::<Vec<_>>();
239    let first = excerpt.first().cloned().unwrap_or_else(|| event.clone());
240    let fingerprint = build_fingerprint(project, source, &event, &first, excerpt.get(1));
241    BugMonitorLogCandidate {
242        project_id: project.project_id.clone(),
243        source_id: source.source_id.clone(),
244        repo: project.repo.clone(),
245        workspace_root: project.workspace_root.clone(),
246        path: absolute_path.display().to_string(),
247        offset_start,
248        offset_end,
249        inode,
250        title: format!("{} reported {}", project.name, summarize_title(&first)),
251        detail: format!(
252            "Detected {} log candidate in {} at byte offsets {}-{}.",
253            level,
254            absolute_path.display(),
255            offset_start,
256            offset_end
257        ),
258        source: format!("bug_monitor.log.{}", source.source_id),
259        process,
260        component,
261        event,
262        level,
263        excerpt,
264        raw_excerpt_redacted,
265        fingerprint,
266        confidence: "high".to_string(),
267        risk_level: "medium".to_string(),
268        expected_destination: "bug_monitor_issue_draft".to_string(),
269        evidence_refs: vec![format!(
270            "tandem://bug-monitor/{}/logs/{}#offset={}-{}",
271            project.project_id, source.source_id, offset_start, offset_end
272        )],
273        timestamp_ms: Some(crate::now_ms()),
274    }
275}
276
277fn first_json_string(value: &Value, keys: &[&str]) -> Option<String> {
278    keys.iter().find_map(|key| {
279        let mut current = value;
280        for part in key.split('.') {
281            current = current.get(part)?;
282        }
283        current
284            .as_str()
285            .map(str::trim)
286            .filter(|s| !s.is_empty())
287            .map(ToString::to_string)
288    })
289}
290
291fn detect_level(text: &str) -> Option<String> {
292    let lower = text.to_ascii_lowercase();
293    if lower.contains("fatal") || lower.contains("panic") || lower.contains("critical") {
294        Some("error".to_string())
295    } else if lower.contains("error")
296        || lower.contains("exception")
297        || lower.contains("traceback")
298        || lower.contains("typeerror")
299        || lower.contains("referenceerror")
300        || lower.contains("syntaxerror")
301    {
302        Some("error".to_string())
303    } else if lower.contains("warn") || lower.contains("[warn]") {
304        Some("warn".to_string())
305    } else {
306        None
307    }
308}
309
310fn level_allowed(level: &str, minimum: &BugMonitorLogMinimumLevel) -> bool {
311    let level = level.to_ascii_lowercase();
312    match minimum {
313        BugMonitorLogMinimumLevel::Error => {
314            matches!(level.as_str(), "error" | "fatal" | "panic" | "critical")
315                || level.contains("error")
316                || level.contains("fatal")
317                || level.contains("panic")
318        }
319        BugMonitorLogMinimumLevel::Warn => {
320            level_allowed(&level, &BugMonitorLogMinimumLevel::Error) || level.contains("warn")
321        }
322    }
323}
324
325fn is_continuation(text: &str) -> bool {
326    text.is_empty()
327        || text.starts_with("at ")
328        || text.starts_with("File \"")
329        || text.starts_with("Caused by")
330        || text.starts_with("Traceback")
331        || text.starts_with("stack backtrace")
332        || text.starts_with(char::is_whitespace)
333}
334
335fn is_new_log_line(text: &str) -> bool {
336    text.len() >= 10
337        && text.as_bytes().get(4) == Some(&b'-')
338        && text.as_bytes().get(7) == Some(&b'-')
339}
340
341fn redact_text(text: &str) -> String {
342    let mut out = text.to_string();
343    for needle in ["api_key=", "token=", "password=", "secret="] {
344        if let Some(idx) = out.to_ascii_lowercase().find(needle) {
345            let end = out[idx..]
346                .find(char::is_whitespace)
347                .map(|rel| idx + rel)
348                .unwrap_or(out.len());
349            out.replace_range(idx..end, &format!("{needle}[redacted]"));
350        }
351    }
352    if let Some(idx) = out.to_ascii_lowercase().find("authorization: bearer ") {
353        let end = out[idx..]
354            .find(char::is_whitespace)
355            .map(|rel| idx + rel)
356            .unwrap_or(out.len());
357        out.replace_range(idx..end, "Authorization: Bearer [redacted]");
358    }
359    out
360}
361
362fn build_fingerprint(
363    project: &BugMonitorMonitoredProject,
364    source: &BugMonitorLogSource,
365    event: &str,
366    message: &str,
367    stack_hint: Option<&String>,
368) -> String {
369    let normalized = normalize_dynamic(message);
370    let stack = stack_hint.map(|s| normalize_dynamic(s)).unwrap_or_default();
371    format!(
372        "{}:{}:{}:{}",
373        project.project_id,
374        source.source_id,
375        event,
376        &crate::sha256_hex(&[&normalized, &stack])[..16]
377    )
378}
379
380fn normalize_dynamic(value: &str) -> String {
381    value
382        .split_whitespace()
383        .map(|token| {
384            if token.chars().any(|ch| ch.is_ascii_digit()) || token.len() > 40 {
385                "<var>"
386            } else {
387                token
388            }
389        })
390        .collect::<Vec<_>>()
391        .join(" ")
392}
393
394fn summarize_title(value: &str) -> String {
395    let clean = value.trim();
396    if clean.len() > 100 {
397        format!("{}...", &clean[..100])
398    } else {
399        clean.to_string()
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    fn project() -> BugMonitorMonitoredProject {
408        BugMonitorMonitoredProject {
409            project_id: "customer-api".to_string(),
410            name: "Customer API".to_string(),
411            repo: "owner/customer-api".to_string(),
412            workspace_root: "/tmp/customer-api".to_string(),
413            ..BugMonitorMonitoredProject::default()
414        }
415    }
416
417    fn source(format: BugMonitorLogFormat) -> BugMonitorLogSource {
418        BugMonitorLogSource {
419            source_id: "api-log".to_string(),
420            path: "logs/app.log".to_string(),
421            format,
422            ..BugMonitorLogSource::default()
423        }
424    }
425
426    #[test]
427    fn json_error_line_becomes_candidate() {
428        let line = br#"{"level":"error","message":"upload failed","exception":{"type":"TypeError","stacktrace":"TypeError: bad\n at normalize src/uploads.ts:42:1"},"service":"api"}"#;
429        let parsed = parse_log_candidates(
430            &project(),
431            &source(BugMonitorLogFormat::Json),
432            Path::new("/tmp/customer-api/logs/app.log"),
433            Some("1".to_string()),
434            10,
435            &[line.as_slice(), b"\n"].concat(),
436            None,
437            None,
438        );
439        assert_eq!(parsed.candidates.len(), 1);
440        let candidate = &parsed.candidates[0];
441        assert_eq!(candidate.repo, "owner/customer-api");
442        assert_eq!(candidate.level, "error");
443        assert!(candidate
444            .excerpt
445            .iter()
446            .any(|line| line.contains("upload failed")));
447        assert!(candidate.fingerprint.contains("customer-api:api-log"));
448    }
449
450    #[test]
451    fn plaintext_redacts_secret_and_groups_stack() {
452        let raw = b"INFO booted\nERROR upload failed token=super-secret\n    at normalize src/uploads.ts:42:1\n";
453        let parsed = parse_log_candidates(
454            &project(),
455            &source(BugMonitorLogFormat::Plaintext),
456            Path::new("/tmp/customer-api/logs/app.log"),
457            None,
458            0,
459            raw,
460            None,
461            None,
462        );
463        assert_eq!(parsed.candidates.len(), 1);
464        let candidate = &parsed.candidates[0];
465        assert!(candidate
466            .raw_excerpt_redacted
467            .iter()
468            .any(|line| line.contains("token=[redacted]")));
469        assert!(candidate
470            .excerpt
471            .iter()
472            .any(|line| line.contains("normalize")));
473    }
474
475    #[test]
476    fn partial_line_tracks_start_offset() {
477        let parsed = parse_log_candidates(
478            &project(),
479            &source(BugMonitorLogFormat::Plaintext),
480            Path::new("/tmp/customer-api/logs/app.log"),
481            None,
482            100,
483            b"ERROR half",
484            None,
485            None,
486        );
487        assert!(parsed.candidates.is_empty());
488        assert_eq!(parsed.next_partial_line.as_deref(), Some("ERROR half"));
489        assert_eq!(parsed.next_partial_line_offset_start, Some(100));
490    }
491}