1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi};
4use crate::compress::{CompressionResult, Compressor};
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11 fn matches(&self, command: &str) -> bool {
12 let tokens = command_tokens(command).collect::<Vec<_>>();
13 tokens.iter().any(|token| token == "mypy")
14 || tokens
15 .windows(3)
16 .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17 }
18
19 fn compress(&self, _command: &str, output: &str) -> CompressionResult {
20 compress_mypy(output).into()
21 }
22
23 fn matches_output(&self, output: &str) -> bool {
24 output.lines().any(|line| {
25 let trimmed = line.trim();
26 is_mypy_success_signature(trimmed) || is_mypy_error_summary_signature(trimmed)
27 })
28 }
29}
30
31fn is_mypy_success_signature(trimmed: &str) -> bool {
32 let Some(rest) = trimmed.strip_prefix("Success: no issues found in ") else {
33 return false;
34 };
35 let Some((count, rest)) = rest.split_once(" source file") else {
36 return false;
37 };
38 !count.is_empty() && count.chars().all(|ch| ch.is_ascii_digit()) && matches!(rest, "" | "s")
39}
40
41fn is_mypy_error_summary_signature(trimmed: &str) -> bool {
42 let Some(rest) = trimmed.strip_prefix("Found ") else {
43 return false;
44 };
45 let Some((error_count, rest)) = rest.split_once(' ') else {
46 return false;
47 };
48 if error_count.is_empty() || !error_count.chars().all(|ch| ch.is_ascii_digit()) {
49 return false;
50 }
51 let Some(rest) = rest
52 .strip_prefix("error in ")
53 .or_else(|| rest.strip_prefix("errors in "))
54 else {
55 return false;
56 };
57 let Some((file_count, rest)) = rest.split_once(' ') else {
58 return false;
59 };
60 !file_count.is_empty()
61 && file_count.chars().all(|ch| ch.is_ascii_digit())
62 && (rest.starts_with("file") || rest.starts_with("files"))
63}
64
65fn compress_mypy(output: &str) -> String {
66 let trimmed = output.trim();
67 if trimmed.starts_with("Success: no issues found") {
68 return "mypy: clean".to_string();
69 }
70
71 let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
72 let mut fileless = Vec::new();
73 let mut summary = None;
74 let mut previous_error_file: Option<String> = None;
75
76 for line in output.lines() {
77 let trimmed = line.trim_end();
78 if is_summary_line(trimmed) {
79 summary = Some(trimmed.to_string());
80 previous_error_file = None;
81 continue;
82 }
83
84 if let Some((file, severity)) = parse_mypy_line(trimmed) {
85 match severity {
86 "error" => {
87 by_file
88 .entry(file.to_string())
89 .or_default()
90 .push(trimmed.to_string());
91 previous_error_file = Some(file.to_string());
92 }
93 "note" => {
94 if previous_error_file.as_deref() == Some(file) {
95 by_file
96 .entry(file.to_string())
97 .or_default()
98 .push(trimmed.to_string());
99 }
100 }
101 _ => previous_error_file = None,
102 }
103 } else if trimmed.contains("error:") && !trimmed.is_empty() {
104 fileless.push(trimmed.to_string());
105 previous_error_file = None;
106 } else {
107 previous_error_file = None;
108 }
109 }
110
111 let mut lines = Vec::new();
112 lines.extend(fileless);
113 for (_file, diagnostics) in by_file {
114 if !lines.is_empty() && !diagnostics.is_empty() {
115 lines.push(String::new());
116 }
117 lines.extend(diagnostics);
118 }
119 if let Some(summary) = summary {
120 if !lines.is_empty() {
121 lines.push(String::new());
122 }
123 lines.push(summary);
124 }
125
126 if lines.is_empty() {
127 return output.trim_end().to_string();
128 }
129
130 finish(&lines.join("\n"))
131}
132
133fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
134 command
135 .split_whitespace()
136 .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
137 .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
138 .map(|token| {
139 token
140 .rsplit(['/', '\\'])
141 .next()
142 .unwrap_or(token)
143 .trim_end_matches(".cmd")
144 .to_string()
145 })
146}
147
148fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
149 let (file, rest) = line.split_once(':')?;
150 let rest = rest.trim_start();
151 let (_, rest) = split_number_prefix(rest)?;
152 let rest = rest.strip_prefix(':')?.trim_start();
153 let rest = if let Some(stripped) = strip_column(rest) {
154 stripped
155 } else {
156 rest
157 };
158 let (severity, _) = rest.split_once(':')?;
159 if matches!(severity, "error" | "note") {
160 Some((file, severity))
161 } else {
162 None
163 }
164}
165
166fn strip_column(rest: &str) -> Option<&str> {
167 let (maybe_column, tail) = rest.split_once(':')?;
168 if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
169 Some(tail.trim_start())
170 } else {
171 None
172 }
173}
174
175fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
176 let digits = input
177 .char_indices()
178 .take_while(|(_, ch)| ch.is_ascii_digit())
179 .last()
180 .map(|(index, ch)| index + ch.len_utf8())?;
181 Some(input.split_at(digits))
182}
183
184fn is_summary_line(trimmed: &str) -> bool {
185 (trimmed.starts_with("Found ") && trimmed.contains(" error"))
186 || trimmed.starts_with("Success: no issues found")
187}
188
189fn finish(input: &str) -> String {
190 let stripped = strip_ansi(input);
191 let deduped = dedup_consecutive(&stripped);
192 cap_lines(
193 &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
194 MAX_LINES,
195 )
196}
197
198fn cap_lines(input: &str, max_lines: usize) -> String {
199 let lines: Vec<&str> = input.lines().collect();
200 if lines.len() <= max_lines {
201 return input.trim_end().to_string();
202 }
203 let mut kept = lines
204 .iter()
205 .take(max_lines)
206 .copied()
207 .collect::<Vec<_>>()
208 .join("\n");
209 kept.push_str(&format!(
210 "\n... truncated {} lines",
211 lines.len() - max_lines
212 ));
213 kept
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn matches_mypy_invocations() {
222 let compressor = MypyCompressor;
223 assert!(compressor.matches("mypy src"));
224 assert!(compressor.matches("python -m mypy src"));
225 assert!(compressor.matches("python3 -m mypy --strict"));
226 assert!(compressor.matches("uv run mypy src"));
227 assert!(!compressor.matches("cargo build"));
228 assert!(!compressor.matches("ls"));
229 }
230
231 #[test]
232 fn compresses_real_success_case() {
233 let output = "Success: no issues found in 1 source file\n";
234 let compressed = compress_mypy(output);
235 assert_eq!(compressed, "mypy: clean");
236 assert!(compressed.len() < output.len());
237 }
238
239 #[test]
240 fn preserves_error_lines_and_summary() {
241 let output = "src/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:15: error: Missing return statement [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
242 let compressed = compress_mypy(output);
243 assert!(compressed
244 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
245 assert!(compressed.contains("src/a.py:15: error: Missing return statement [return]"));
246 assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\" [arg-type]"));
247 assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
248 }
249
250 #[test]
251 fn keeps_attached_notes_and_drops_standalone_notes() {
252 let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
253 let compressed = compress_mypy(output);
254 assert!(compressed
255 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
256 assert!(compressed.contains("src/a.py:10: note: Expected int"));
257 assert!(!compressed.contains("Standalone note"));
258 assert!(!compressed.contains("Use `Type[X]`"));
259 }
260
261 #[test]
262 fn compresses_large_note_heavy_input() {
263 let mut output = String::new();
264 for index in 0..500 {
265 output.push_str(&format!(
266 "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
267 index,
268 index + 1
269 ));
270 }
271 output.push_str("src/a.py:10: error: Incompatible types in assignment [assignment]\n");
272 output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
273 let compressed = compress_mypy(&output);
274 assert!(compressed
275 .contains("src/a.py:10: error: Incompatible types in assignment [assignment]"));
276 assert!(compressed.contains("Found 1 error in 1 file"));
277 assert!(compressed.len() < output.len() / 2);
278 assert!(!compressed.contains("Standalone informational"));
279 }
280}