1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi, GenericCompressor};
4use crate::compress::{CompressionResult, Compressor};
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11 fn matches(&self, command: &str) -> bool {
12 let tokens = command_tokens(command).collect::<Vec<_>>();
13 tokens.iter().any(|token| token == "mypy")
14 || tokens
15 .windows(3)
16 .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17 }
18
19 fn compress_with_exit_code(
20 &self,
21 _command: &str,
22 output: &str,
23 exit_code: Option<i32>,
24 ) -> CompressionResult {
25 if matches!(exit_code, Some(code) if code != 0)
26 && output.trim().starts_with("Success: no issues found")
27 {
28 return GenericCompressor::compress_output(output).into();
29 }
30 compress_mypy(output).into()
31 }
32
33 fn matches_output(&self, output: &str) -> bool {
34 output.lines().any(|line| {
35 let trimmed = line.trim();
36 is_mypy_success_signature(trimmed) || is_mypy_error_summary_signature(trimmed)
37 })
38 }
39}
40
41fn is_mypy_success_signature(trimmed: &str) -> bool {
42 let Some(rest) = trimmed.strip_prefix("Success: no issues found in ") else {
43 return false;
44 };
45 let Some((count, rest)) = rest.split_once(" source file") else {
46 return false;
47 };
48 !count.is_empty() && count.chars().all(|ch| ch.is_ascii_digit()) && matches!(rest, "" | "s")
49}
50
51fn is_mypy_error_summary_signature(trimmed: &str) -> bool {
52 let Some(rest) = trimmed.strip_prefix("Found ") else {
53 return false;
54 };
55 let Some((error_count, rest)) = rest.split_once(' ') else {
56 return false;
57 };
58 if error_count.is_empty() || !error_count.chars().all(|ch| ch.is_ascii_digit()) {
59 return false;
60 }
61 let Some(rest) = rest
62 .strip_prefix("error in ")
63 .or_else(|| rest.strip_prefix("errors in "))
64 else {
65 return false;
66 };
67 let Some((file_count, rest)) = rest.split_once(' ') else {
68 return false;
69 };
70 !file_count.is_empty()
71 && file_count.chars().all(|ch| ch.is_ascii_digit())
72 && (rest.starts_with("file") || rest.starts_with("files"))
73}
74
75fn compress_mypy(output: &str) -> String {
76 let trimmed = output.trim();
77 if trimmed.starts_with("Success: no issues found") {
78 return "mypy: clean".to_string();
79 }
80
81 let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
82 let mut fileless = Vec::new();
83 let mut summary = None;
84 let mut previous_error_file: Option<String> = None;
85
86 for line in output.lines() {
87 let trimmed = line.trim_end();
88 if is_summary_line(trimmed) {
89 summary = Some(trimmed.to_string());
90 previous_error_file = None;
91 continue;
92 }
93
94 if let Some((file, severity)) = parse_mypy_line(trimmed) {
95 match severity {
96 "error" => {
97 by_file
98 .entry(file.to_string())
99 .or_default()
100 .push(trimmed.to_string());
101 previous_error_file = Some(file.to_string());
102 }
103 "note" => {
104 if previous_error_file.as_deref() == Some(file) {
105 by_file
106 .entry(file.to_string())
107 .or_default()
108 .push(trimmed.to_string());
109 }
110 }
111 _ => previous_error_file = None,
112 }
113 } else if trimmed.contains("error:") && !trimmed.is_empty() {
114 fileless.push(trimmed.to_string());
115 previous_error_file = None;
116 } else {
117 previous_error_file = None;
118 }
119 }
120
121 let mut lines = Vec::new();
122 lines.extend(fileless);
123 for (_file, diagnostics) in by_file {
124 if !lines.is_empty() && !diagnostics.is_empty() {
125 lines.push(String::new());
126 }
127 lines.extend(diagnostics);
128 }
129 if let Some(summary) = summary {
130 if !lines.is_empty() {
131 lines.push(String::new());
132 }
133 lines.push(summary);
134 }
135
136 if lines.is_empty() {
137 return output.trim_end().to_string();
138 }
139
140 finish(&lines.join("\n"))
141}
142
143fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
144 command
145 .split_whitespace()
146 .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
147 .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
148 .map(|token| {
149 token
150 .rsplit(['/', '\\'])
151 .next()
152 .unwrap_or(token)
153 .trim_end_matches(".cmd")
154 .to_string()
155 })
156}
157
158fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
159 let (file, rest) = line.split_once(':')?;
160 let rest = rest.trim_start();
161 let (_, rest) = split_number_prefix(rest)?;
162 let rest = rest.strip_prefix(':')?.trim_start();
163 let rest = if let Some(stripped) = strip_column(rest) {
164 stripped
165 } else {
166 rest
167 };
168 let (severity, _) = rest.split_once(':')?;
169 if matches!(severity, "error" | "note") {
170 Some((file, severity))
171 } else {
172 None
173 }
174}
175
176fn strip_column(rest: &str) -> Option<&str> {
177 let (maybe_column, tail) = rest.split_once(':')?;
178 if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
179 Some(tail.trim_start())
180 } else {
181 None
182 }
183}
184
185fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
186 let digits = input
187 .char_indices()
188 .take_while(|(_, ch)| ch.is_ascii_digit())
189 .last()
190 .map(|(index, ch)| index + ch.len_utf8())?;
191 Some(input.split_at(digits))
192}
193
194fn is_summary_line(trimmed: &str) -> bool {
195 (trimmed.starts_with("Found ") && trimmed.contains(" error"))
196 || trimmed.starts_with("Success: no issues found")
197}
198
199fn finish(input: &str) -> String {
200 let stripped = strip_ansi(input);
201 let deduped = dedup_consecutive(&stripped);
202 cap_lines(
203 &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
204 MAX_LINES,
205 )
206}
207
208fn cap_lines(input: &str, max_lines: usize) -> String {
209 let lines: Vec<&str> = input.lines().collect();
210 if lines.len() <= max_lines {
211 return input.trim_end().to_string();
212 }
213 let mut kept = lines
214 .iter()
215 .take(max_lines)
216 .copied()
217 .collect::<Vec<_>>()
218 .join("\n");
219 kept.push_str(&format!(
220 "\n... truncated {} lines",
221 lines.len() - max_lines
222 ));
223 kept
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn matches_mypy_invocations() {
232 let compressor = MypyCompressor;
233 assert!(compressor.matches("mypy src"));
234 assert!(compressor.matches("python -m mypy src"));
235 assert!(compressor.matches("python3 -m mypy --strict"));
236 assert!(compressor.matches("uv run mypy src"));
237 assert!(!compressor.matches("cargo build"));
238 assert!(!compressor.matches("ls"));
239 }
240
241 #[test]
242 fn compresses_real_success_case() {
243 let output = "Success: no issues found in 1 source file\n";
244 let compressed = compress_mypy(output);
245 assert_eq!(compressed, "mypy: clean");
246 assert!(compressed.len() < output.len());
247 }
248
249 #[test]
250 fn preserves_error_lines_and_summary() {
251 let output = "src/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:15: error: Missing return statement [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
252 let compressed = compress_mypy(output);
253 assert!(compressed
254 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
255 assert!(compressed.contains("src/a.py:15: error: Missing return statement [return]"));
256 assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]"));
257 assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
258 }
259
260 #[test]
261 fn keeps_attached_notes_and_drops_standalone_notes() {
262 let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
263 let compressed = compress_mypy(output);
264 assert!(compressed
265 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
266 assert!(compressed.contains("src/a.py:10: note: Expected int"));
267 assert!(!compressed.contains("Standalone note"));
268 assert!(!compressed.contains("Use `Type[X]`"));
269 }
270
271 #[test]
272 fn compresses_large_note_heavy_input() {
273 let mut output = String::new();
274 for index in 0..500 {
275 output.push_str(&format!(
276 "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
277 index,
278 index + 1
279 ));
280 }
281 output.push_str("src/a.py:10: error: Incompatible types in assignment [assignment]\n");
282 output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
283 let compressed = compress_mypy(&output);
284 assert!(compressed
285 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
286 assert!(compressed.contains("Found 1 error in 1 file"));
287 assert!(compressed.len() < output.len() / 2);
288 assert!(!compressed.contains("Standalone informational"));
289 }
290}