lean_ctx/core/patterns/
mypy.rs1use regex::Regex;
2use std::sync::OnceLock;
3
4static MYPY_ERROR_RE: OnceLock<Regex> = OnceLock::new();
5static MYPY_SUMMARY_RE: OnceLock<Regex> = OnceLock::new();
6
7fn error_re() -> &'static Regex {
8 MYPY_ERROR_RE.get_or_init(|| {
9 Regex::new(r"^(.+?):(\d+):\s+(error|warning|note):\s+(.+?)(?:\s+\[(.+)\])?$").unwrap()
10 })
11}
12
13fn summary_re() -> &'static Regex {
14 MYPY_SUMMARY_RE.get_or_init(|| Regex::new(r"Found (\d+) errors? in (\d+) files?").unwrap())
15}
16
17pub fn compress(_command: &str, output: &str) -> Option<String> {
18 let trimmed = output.trim();
19 if trimmed.is_empty() {
20 return Some("ok".to_string());
21 }
22
23 if trimmed == "Success: no issues found in source files" || trimmed.contains("no issues found")
24 {
25 return Some("clean".to_string());
26 }
27
28 let mut by_code: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
29 let mut by_severity: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
30 let mut files: std::collections::HashSet<String> = std::collections::HashSet::new();
31 let mut first_errors: Vec<String> = Vec::new();
32
33 for line in trimmed.lines() {
34 if let Some(caps) = error_re().captures(line) {
35 let file = caps[1].to_string();
36 let severity = caps[3].to_string();
37 let msg = caps[4].to_string();
38 let code = caps.get(5).map(|m| m.as_str().to_string());
39
40 files.insert(file.clone());
41 *by_severity.entry(severity).or_insert(0) += 1;
42
43 if let Some(ref c) = code {
44 *by_code.entry(c.clone()).or_insert(0) += 1;
45 }
46
47 if first_errors.len() < 5 {
48 let short_file = file.rsplit('/').next().unwrap_or(&file);
49 let line_num = &caps[2];
50 let code_str = code.as_deref().unwrap_or("?");
51 first_errors.push(format!(" {short_file}:{line_num} [{code_str}] {msg}"));
52 }
53 }
54 }
55
56 if let Some(caps) = summary_re().captures(trimmed) {
57 let errors = &caps[1];
58 let file_count = &caps[2];
59
60 let mut parts = vec![format!("{errors} errors in {file_count} files")];
61
62 if !by_code.is_empty() {
63 let mut codes: Vec<(String, u32)> = by_code.into_iter().collect();
64 codes.sort_by(|a, b| b.1.cmp(&a.1));
65 for (code, count) in codes.iter().take(6) {
66 parts.push(format!(" [{code}]: {count}"));
67 }
68 if codes.len() > 6 {
69 parts.push(format!(" ... +{} more codes", codes.len() - 6));
70 }
71 }
72
73 if !first_errors.is_empty() {
74 parts.push("Top errors:".to_string());
75 parts.extend(first_errors);
76 }
77
78 return Some(parts.join("\n"));
79 }
80
81 if !files.is_empty() {
82 let total: u32 = by_severity.values().sum();
83 let err_count = by_severity.get("error").copied().unwrap_or(0);
84 let warn_count = by_severity.get("warning").copied().unwrap_or(0);
85
86 let mut parts = vec![format!(
87 "{total} issues in {} files ({err_count} errors, {warn_count} warnings)",
88 files.len()
89 )];
90
91 if !first_errors.is_empty() {
92 parts.extend(first_errors);
93 }
94
95 return Some(parts.join("\n"));
96 }
97
98 let lines: Vec<&str> = trimmed.lines().collect();
99 if lines.len() <= 8 {
100 Some(trimmed.to_string())
101 } else {
102 Some(format!(
103 "{}\n... ({} more lines)",
104 lines[..8].join("\n"),
105 lines.len() - 8
106 ))
107 }
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113
114 #[test]
115 fn mypy_clean_output() {
116 let output = "Success: no issues found in source files";
117 assert_eq!(compress("mypy .", output).unwrap(), "clean");
118 }
119
120 #[test]
121 fn mypy_empty_output() {
122 assert_eq!(compress("mypy .", "").unwrap(), "ok");
123 }
124
125 #[test]
126 fn mypy_errors_with_summary() {
127 let output = r#"src/auth.py:42: error: Argument 1 to "validate" has incompatible type "str"; expected "int" [arg-type]
128src/auth.py:55: error: Missing return statement [return]
129src/db.py:10: error: Name "cursor" is not defined [name-defined]
130Found 3 errors in 2 files (checked 15 source files)"#;
131 let result = compress("mypy .", output).unwrap();
132 assert!(result.contains("3 errors in 2 files"));
133 assert!(result.contains("[arg-type]"));
134 }
135
136 #[test]
137 fn mypy_errors_without_summary() {
138 let output = r#"src/main.py:10: error: Incompatible return value type [return-value]
139src/main.py:20: warning: Unused "type: ignore" comment [unused-ignore]"#;
140 let result = compress("mypy src/", output).unwrap();
141 assert!(result.contains("2 issues"));
142 assert!(result.contains("1 errors"));
143 assert!(result.contains("1 warnings"));
144 }
145
146 #[test]
147 fn mypy_with_notes() {
148 let output = "src/api.py:5: error: Missing type annotation [no-untyped-def]\nsrc/api.py:5: note: Use --disallow-untyped-defs\nFound 1 error in 1 file (checked 3 source files)";
149 let result = compress("mypy .", output).unwrap();
150 assert!(result.contains("1 error"));
151 }
152}