Skip to main content

aft/compress/
mypy.rs

1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi, GenericCompressor};
4use crate::compress::{CompressionResult, Compressor};
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11    fn matches(&self, command: &str) -> bool {
12        let tokens = command_tokens(command).collect::<Vec<_>>();
13        tokens.iter().any(|token| token == "mypy")
14            || tokens
15                .windows(3)
16                .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17    }
18
19    fn compress_with_exit_code(
20        &self,
21        _command: &str,
22        output: &str,
23        exit_code: Option<i32>,
24    ) -> CompressionResult {
25        if matches!(exit_code, Some(code) if code != 0)
26            && output.trim().starts_with("Success: no issues found")
27        {
28            return GenericCompressor::compress_output(output).into();
29        }
30        compress_mypy(output).into()
31    }
32
33    fn matches_output(&self, output: &str) -> bool {
34        output.lines().any(|line| {
35            let trimmed = line.trim();
36            is_mypy_success_signature(trimmed) || is_mypy_error_summary_signature(trimmed)
37        })
38    }
39}
40
41fn is_mypy_success_signature(trimmed: &str) -> bool {
42    let Some(rest) = trimmed.strip_prefix("Success: no issues found in ") else {
43        return false;
44    };
45    let Some((count, rest)) = rest.split_once(" source file") else {
46        return false;
47    };
48    !count.is_empty() && count.chars().all(|ch| ch.is_ascii_digit()) && matches!(rest, "" | "s")
49}
50
51fn is_mypy_error_summary_signature(trimmed: &str) -> bool {
52    let Some(rest) = trimmed.strip_prefix("Found ") else {
53        return false;
54    };
55    let Some((error_count, rest)) = rest.split_once(' ') else {
56        return false;
57    };
58    if error_count.is_empty() || !error_count.chars().all(|ch| ch.is_ascii_digit()) {
59        return false;
60    }
61    let Some(rest) = rest
62        .strip_prefix("error in ")
63        .or_else(|| rest.strip_prefix("errors in "))
64    else {
65        return false;
66    };
67    let Some((file_count, rest)) = rest.split_once(' ') else {
68        return false;
69    };
70    !file_count.is_empty()
71        && file_count.chars().all(|ch| ch.is_ascii_digit())
72        && (rest.starts_with("file") || rest.starts_with("files"))
73}
74
75fn compress_mypy(output: &str) -> String {
76    let trimmed = output.trim();
77    if trimmed.starts_with("Success: no issues found") {
78        return "mypy: clean".to_string();
79    }
80
81    let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
82    let mut fileless = Vec::new();
83    let mut summary = None;
84    let mut previous_error_file: Option<String> = None;
85
86    for line in output.lines() {
87        let trimmed = line.trim_end();
88        if is_summary_line(trimmed) {
89            summary = Some(trimmed.to_string());
90            previous_error_file = None;
91            continue;
92        }
93
94        if let Some((file, severity)) = parse_mypy_line(trimmed) {
95            match severity {
96                "error" => {
97                    by_file
98                        .entry(file.to_string())
99                        .or_default()
100                        .push(trimmed.to_string());
101                    previous_error_file = Some(file.to_string());
102                }
103                "note" => {
104                    if previous_error_file.as_deref() == Some(file) {
105                        by_file
106                            .entry(file.to_string())
107                            .or_default()
108                            .push(trimmed.to_string());
109                    }
110                }
111                _ => previous_error_file = None,
112            }
113        } else if trimmed.contains("error:") && !trimmed.is_empty() {
114            fileless.push(trimmed.to_string());
115            previous_error_file = None;
116        } else {
117            previous_error_file = None;
118        }
119    }
120
121    let mut lines = Vec::new();
122    lines.extend(fileless);
123    for (_file, diagnostics) in by_file {
124        if !lines.is_empty() && !diagnostics.is_empty() {
125            lines.push(String::new());
126        }
127        lines.extend(diagnostics);
128    }
129    if let Some(summary) = summary {
130        if !lines.is_empty() {
131            lines.push(String::new());
132        }
133        lines.push(summary);
134    }
135
136    if lines.is_empty() {
137        return output.trim_end().to_string();
138    }
139
140    finish(&lines.join("\n"))
141}
142
143fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
144    command
145        .split_whitespace()
146        .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
147        .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
148        .map(|token| {
149            token
150                .rsplit(['/', '\\'])
151                .next()
152                .unwrap_or(token)
153                .trim_end_matches(".cmd")
154                .to_string()
155        })
156}
157
158fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
159    let (file, rest) = line.split_once(':')?;
160    let rest = rest.trim_start();
161    let (_, rest) = split_number_prefix(rest)?;
162    let rest = rest.strip_prefix(':')?.trim_start();
163    let rest = if let Some(stripped) = strip_column(rest) {
164        stripped
165    } else {
166        rest
167    };
168    let (severity, _) = rest.split_once(':')?;
169    if matches!(severity, "error" | "note") {
170        Some((file, severity))
171    } else {
172        None
173    }
174}
175
176fn strip_column(rest: &str) -> Option<&str> {
177    let (maybe_column, tail) = rest.split_once(':')?;
178    if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
179        Some(tail.trim_start())
180    } else {
181        None
182    }
183}
184
185fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
186    let digits = input
187        .char_indices()
188        .take_while(|(_, ch)| ch.is_ascii_digit())
189        .last()
190        .map(|(index, ch)| index + ch.len_utf8())?;
191    Some(input.split_at(digits))
192}
193
194fn is_summary_line(trimmed: &str) -> bool {
195    (trimmed.starts_with("Found ") && trimmed.contains(" error"))
196        || trimmed.starts_with("Success: no issues found")
197}
198
199fn finish(input: &str) -> String {
200    let stripped = strip_ansi(input);
201    let deduped = dedup_consecutive(&stripped);
202    cap_lines(
203        &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
204        MAX_LINES,
205    )
206}
207
208fn cap_lines(input: &str, max_lines: usize) -> String {
209    let lines: Vec<&str> = input.lines().collect();
210    if lines.len() <= max_lines {
211        return input.trim_end().to_string();
212    }
213    let mut kept = lines
214        .iter()
215        .take(max_lines)
216        .copied()
217        .collect::<Vec<_>>()
218        .join("\n");
219    kept.push_str(&format!(
220        "\n... truncated {} lines",
221        lines.len() - max_lines
222    ));
223    kept
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn matches_mypy_invocations() {
232        let compressor = MypyCompressor;
233        assert!(compressor.matches("mypy src"));
234        assert!(compressor.matches("python -m mypy src"));
235        assert!(compressor.matches("python3 -m mypy --strict"));
236        assert!(compressor.matches("uv run mypy src"));
237        assert!(!compressor.matches("cargo build"));
238        assert!(!compressor.matches("ls"));
239    }
240
241    #[test]
242    fn compresses_real_success_case() {
243        let output = "Success: no issues found in 1 source file\n";
244        let compressed = compress_mypy(output);
245        assert_eq!(compressed, "mypy: clean");
246        assert!(compressed.len() < output.len());
247    }
248
249    #[test]
250    fn preserves_error_lines_and_summary() {
251        let output = "src/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:15: error: Missing return statement  [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
252        let compressed = compress_mypy(output);
253        assert!(compressed
254            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
255        assert!(compressed.contains("src/a.py:15: error: Missing return statement  [return]"));
256        assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]"));
257        assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
258    }
259
260    #[test]
261    fn keeps_attached_notes_and_drops_standalone_notes() {
262        let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
263        let compressed = compress_mypy(output);
264        assert!(compressed
265            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
266        assert!(compressed.contains("src/a.py:10: note: Expected int"));
267        assert!(!compressed.contains("Standalone note"));
268        assert!(!compressed.contains("Use `Type[X]`"));
269    }
270
271    #[test]
272    fn compresses_large_note_heavy_input() {
273        let mut output = String::new();
274        for index in 0..500 {
275            output.push_str(&format!(
276                "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
277                index,
278                index + 1
279            ));
280        }
281        output.push_str("src/a.py:10: error: Incompatible types in assignment  [assignment]\n");
282        output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
283        let compressed = compress_mypy(&output);
284        assert!(compressed
285            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
286        assert!(compressed.contains("Found 1 error in 1 file"));
287        assert!(compressed.len() < output.len() / 2);
288        assert!(!compressed.contains("Standalone informational"));
289    }
290}