Skip to main content

lean_ctx/core/patterns/
mypy.rs

1macro_rules! static_regex {
2    ($pattern:expr) => {{
3        static RE: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
4        RE.get_or_init(|| {
5            regex::Regex::new($pattern).expect(concat!("BUG: invalid static regex: ", $pattern))
6        })
7    }};
8}
9
10fn error_re() -> &'static regex::Regex {
11    static_regex!(r"^(.+?):(\d+):\s+(error|warning|note):\s+(.+?)(?:\s+\[(.+)\])?$")
12}
13
14fn summary_re() -> &'static regex::Regex {
15    static_regex!(r"Found (\d+) errors? in (\d+) files?")
16}
17
18pub fn compress(_command: &str, output: &str) -> Option<String> {
19    let trimmed = output.trim();
20    if trimmed.is_empty() {
21        return Some("ok".to_string());
22    }
23
24    if trimmed == "Success: no issues found in source files" || trimmed.contains("no issues found")
25    {
26        return Some("clean".to_string());
27    }
28
29    let mut by_code: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
30    let mut by_severity: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
31    let mut files: std::collections::HashSet<String> = std::collections::HashSet::new();
32    let mut first_errors: Vec<String> = Vec::new();
33
34    for line in trimmed.lines() {
35        if let Some(caps) = error_re().captures(line) {
36            let file = caps[1].to_string();
37            let severity = caps[3].to_string();
38            let msg = caps[4].to_string();
39            let code = caps.get(5).map(|m| m.as_str().to_string());
40
41            files.insert(file.clone());
42            *by_severity.entry(severity).or_insert(0) += 1;
43
44            if let Some(ref c) = code {
45                *by_code.entry(c.clone()).or_insert(0) += 1;
46            }
47
48            if first_errors.len() < 5 {
49                let short_file = file.rsplit('/').next().unwrap_or(&file);
50                let line_num = &caps[2];
51                let code_str = code.as_deref().unwrap_or("?");
52                first_errors.push(format!("  {short_file}:{line_num} [{code_str}] {msg}"));
53            }
54        }
55    }
56
57    if let Some(caps) = summary_re().captures(trimmed) {
58        let errors = &caps[1];
59        let file_count = &caps[2];
60
61        let mut parts = vec![format!("{errors} errors in {file_count} files")];
62
63        if !by_code.is_empty() {
64            let mut codes: Vec<(String, u32)> = by_code.into_iter().collect();
65            codes.sort_by_key(|x| std::cmp::Reverse(x.1));
66            for (code, count) in codes.iter().take(6) {
67                parts.push(format!("  [{code}]: {count}"));
68            }
69            if codes.len() > 6 {
70                parts.push(format!("  ... +{} more codes", codes.len() - 6));
71            }
72        }
73
74        if !first_errors.is_empty() {
75            parts.push("Top errors:".to_string());
76            parts.extend(first_errors);
77        }
78
79        return Some(parts.join("\n"));
80    }
81
82    if !files.is_empty() {
83        let total: u32 = by_severity.values().sum();
84        let err_count = by_severity.get("error").copied().unwrap_or(0);
85        let warn_count = by_severity.get("warning").copied().unwrap_or(0);
86
87        let mut parts = vec![format!(
88            "{total} issues in {} files ({err_count} errors, {warn_count} warnings)",
89            files.len()
90        )];
91
92        if !first_errors.is_empty() {
93            parts.extend(first_errors);
94        }
95
96        return Some(parts.join("\n"));
97    }
98
99    let lines: Vec<&str> = trimmed.lines().collect();
100    if lines.len() <= 8 {
101        Some(trimmed.to_string())
102    } else {
103        Some(format!(
104            "{}\n... ({} more lines)",
105            lines[..8].join("\n"),
106            lines.len() - 8
107        ))
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[test]
116    fn mypy_clean_output() {
117        let output = "Success: no issues found in source files";
118        assert_eq!(compress("mypy .", output).unwrap(), "clean");
119    }
120
121    #[test]
122    fn mypy_empty_output() {
123        assert_eq!(compress("mypy .", "").unwrap(), "ok");
124    }
125
126    #[test]
127    fn mypy_errors_with_summary() {
128        let output = r#"src/auth.py:42: error: Argument 1 to "validate" has incompatible type "str"; expected "int"  [arg-type]
129src/auth.py:55: error: Missing return statement  [return]
130src/db.py:10: error: Name "cursor" is not defined  [name-defined]
131Found 3 errors in 2 files (checked 15 source files)"#;
132        let result = compress("mypy .", output).unwrap();
133        assert!(result.contains("3 errors in 2 files"));
134        assert!(result.contains("[arg-type]"));
135    }
136
137    #[test]
138    fn mypy_errors_without_summary() {
139        let output = r#"src/main.py:10: error: Incompatible return value type  [return-value]
140src/main.py:20: warning: Unused "type: ignore" comment  [unused-ignore]"#;
141        let result = compress("mypy src/", output).unwrap();
142        assert!(result.contains("2 issues"));
143        assert!(result.contains("1 errors"));
144        assert!(result.contains("1 warnings"));
145    }
146
147    #[test]
148    fn mypy_with_notes() {
149        let output = "src/api.py:5: error: Missing type annotation  [no-untyped-def]\nsrc/api.py:5: note: Use --disallow-untyped-defs\nFound 1 error in 1 file (checked 3 source files)";
150        let result = compress("mypy .", output).unwrap();
151        assert!(result.contains("1 error"));
152    }
153}