1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi};
4use crate::compress::Compressor;
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11 fn matches(&self, command: &str) -> bool {
12 let tokens = command_tokens(command).collect::<Vec<_>>();
13 tokens.iter().any(|token| token == "mypy")
14 || tokens
15 .windows(3)
16 .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17 }
18
19 fn compress(&self, _command: &str, output: &str) -> String {
20 compress_mypy(output)
21 }
22}
23
24fn compress_mypy(output: &str) -> String {
25 let trimmed = output.trim();
26 if trimmed.starts_with("Success: no issues found") {
27 return "mypy: clean".to_string();
28 }
29
30 let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
31 let mut fileless = Vec::new();
32 let mut summary = None;
33 let mut previous_error_file: Option<String> = None;
34
35 for line in output.lines() {
36 let trimmed = line.trim_end();
37 if is_summary_line(trimmed) {
38 summary = Some(trimmed.to_string());
39 previous_error_file = None;
40 continue;
41 }
42
43 if let Some((file, severity)) = parse_mypy_line(trimmed) {
44 match severity {
45 "error" => {
46 by_file
47 .entry(file.to_string())
48 .or_default()
49 .push(trimmed.to_string());
50 previous_error_file = Some(file.to_string());
51 }
52 "note" => {
53 if previous_error_file.as_deref() == Some(file) {
54 by_file
55 .entry(file.to_string())
56 .or_default()
57 .push(trimmed.to_string());
58 }
59 }
60 _ => previous_error_file = None,
61 }
62 } else if trimmed.contains("error:") && !trimmed.is_empty() {
63 fileless.push(trimmed.to_string());
64 previous_error_file = None;
65 } else {
66 previous_error_file = None;
67 }
68 }
69
70 let mut lines = Vec::new();
71 lines.extend(fileless);
72 for (_file, diagnostics) in by_file {
73 if !lines.is_empty() && !diagnostics.is_empty() {
74 lines.push(String::new());
75 }
76 lines.extend(diagnostics);
77 }
78 if let Some(summary) = summary {
79 if !lines.is_empty() {
80 lines.push(String::new());
81 }
82 lines.push(summary);
83 }
84
85 if lines.is_empty() {
86 return output.trim_end().to_string();
87 }
88
89 finish(&lines.join("\n"))
90}
91
92fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
93 command
94 .split_whitespace()
95 .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
96 .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
97 .map(|token| {
98 token
99 .rsplit(['/', '\\'])
100 .next()
101 .unwrap_or(token)
102 .trim_end_matches(".cmd")
103 .to_string()
104 })
105}
106
107fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
108 let (file, rest) = line.split_once(':')?;
109 let rest = rest.trim_start();
110 let (_, rest) = split_number_prefix(rest)?;
111 let rest = rest.strip_prefix(':')?.trim_start();
112 let rest = if let Some(stripped) = strip_column(rest) {
113 stripped
114 } else {
115 rest
116 };
117 let (severity, _) = rest.split_once(':')?;
118 if matches!(severity, "error" | "note") {
119 Some((file, severity))
120 } else {
121 None
122 }
123}
124
125fn strip_column(rest: &str) -> Option<&str> {
126 let (maybe_column, tail) = rest.split_once(':')?;
127 if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
128 Some(tail.trim_start())
129 } else {
130 None
131 }
132}
133
134fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
135 let digits = input
136 .char_indices()
137 .take_while(|(_, ch)| ch.is_ascii_digit())
138 .last()
139 .map(|(index, ch)| index + ch.len_utf8())?;
140 Some(input.split_at(digits))
141}
142
143fn is_summary_line(trimmed: &str) -> bool {
144 (trimmed.starts_with("Found ") && trimmed.contains(" error"))
145 || trimmed.starts_with("Success: no issues found")
146}
147
148fn finish(input: &str) -> String {
149 let stripped = strip_ansi(input);
150 let deduped = dedup_consecutive(&stripped);
151 cap_lines(
152 &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
153 MAX_LINES,
154 )
155}
156
157fn cap_lines(input: &str, max_lines: usize) -> String {
158 let lines: Vec<&str> = input.lines().collect();
159 if lines.len() <= max_lines {
160 return input.trim_end().to_string();
161 }
162 let mut kept = lines
163 .iter()
164 .take(max_lines)
165 .copied()
166 .collect::<Vec<_>>()
167 .join("\n");
168 kept.push_str(&format!(
169 "\n... truncated {} lines",
170 lines.len() - max_lines
171 ));
172 kept
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn matches_mypy_invocations() {
181 let compressor = MypyCompressor;
182 assert!(compressor.matches("mypy src"));
183 assert!(compressor.matches("python -m mypy src"));
184 assert!(compressor.matches("python3 -m mypy --strict"));
185 assert!(compressor.matches("uv run mypy src"));
186 assert!(!compressor.matches("cargo build"));
187 assert!(!compressor.matches("ls"));
188 }
189
190 #[test]
191 fn compresses_real_success_case() {
192 let output = "Success: no issues found in 1 source file\n";
193 let compressed = compress_mypy(output);
194 assert_eq!(compressed, "mypy: clean");
195 assert!(compressed.len() < output.len());
196 }
197
198 #[test]
199 fn preserves_error_lines_and_summary() {
200 let output = "src/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:15: error: Missing return statement [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
201 let compressed = compress_mypy(output);
202 assert!(compressed
203 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
204 assert!(compressed.contains("src/a.py:15: error: Missing return statement [return]"));
205 assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]"));
206 assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
207 }
208
209 #[test]
210 fn keeps_attached_notes_and_drops_standalone_notes() {
211 let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
212 let compressed = compress_mypy(output);
213 assert!(compressed
214 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
215 assert!(compressed.contains("src/a.py:10: note: Expected int"));
216 assert!(!compressed.contains("Standalone note"));
217 assert!(!compressed.contains("Use `Type[X]`"));
218 }
219
220 #[test]
221 fn compresses_large_note_heavy_input() {
222 let mut output = String::new();
223 for index in 0..500 {
224 output.push_str(&format!(
225 "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
226 index,
227 index + 1
228 ));
229 }
230 output.push_str("src/a.py:10: error: Incompatible types in assignment [assignment]\n");
231 output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
232 let compressed = compress_mypy(&output);
233 assert!(compressed
234 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
235 assert!(compressed.contains("Found 1 error in 1 file"));
236 assert!(compressed.len() < output.len() / 2);
237 assert!(!compressed.contains("Standalone informational"));
238 }
239}