Skip to main content

aft/compress/
mypy.rs

1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi};
4use crate::compress::Compressor;
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11    fn matches(&self, command: &str) -> bool {
12        let tokens = command_tokens(command).collect::<Vec<_>>();
13        tokens.iter().any(|token| token == "mypy")
14            || tokens
15                .windows(3)
16                .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17    }
18
19    fn compress(&self, _command: &str, output: &str) -> String {
20        compress_mypy(output)
21    }
22
23    fn matches_output(&self, output: &str) -> bool {
24        output.lines().any(|line| {
25            let trimmed = line.trim();
26            is_mypy_success_signature(trimmed) || is_mypy_error_summary_signature(trimmed)
27        })
28    }
29}
30
31fn is_mypy_success_signature(trimmed: &str) -> bool {
32    let Some(rest) = trimmed.strip_prefix("Success: no issues found in ") else {
33        return false;
34    };
35    let Some((count, rest)) = rest.split_once(" source file") else {
36        return false;
37    };
38    !count.is_empty() && count.chars().all(|ch| ch.is_ascii_digit()) && matches!(rest, "" | "s")
39}
40
41fn is_mypy_error_summary_signature(trimmed: &str) -> bool {
42    let Some(rest) = trimmed.strip_prefix("Found ") else {
43        return false;
44    };
45    let Some((error_count, rest)) = rest.split_once(' ') else {
46        return false;
47    };
48    if error_count.is_empty() || !error_count.chars().all(|ch| ch.is_ascii_digit()) {
49        return false;
50    }
51    let Some(rest) = rest
52        .strip_prefix("error in ")
53        .or_else(|| rest.strip_prefix("errors in "))
54    else {
55        return false;
56    };
57    let Some((file_count, rest)) = rest.split_once(' ') else {
58        return false;
59    };
60    !file_count.is_empty()
61        && file_count.chars().all(|ch| ch.is_ascii_digit())
62        && (rest.starts_with("file") || rest.starts_with("files"))
63}
64
65fn compress_mypy(output: &str) -> String {
66    let trimmed = output.trim();
67    if trimmed.starts_with("Success: no issues found") {
68        return "mypy: clean".to_string();
69    }
70
71    let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
72    let mut fileless = Vec::new();
73    let mut summary = None;
74    let mut previous_error_file: Option<String> = None;
75
76    for line in output.lines() {
77        let trimmed = line.trim_end();
78        if is_summary_line(trimmed) {
79            summary = Some(trimmed.to_string());
80            previous_error_file = None;
81            continue;
82        }
83
84        if let Some((file, severity)) = parse_mypy_line(trimmed) {
85            match severity {
86                "error" => {
87                    by_file
88                        .entry(file.to_string())
89                        .or_default()
90                        .push(trimmed.to_string());
91                    previous_error_file = Some(file.to_string());
92                }
93                "note" => {
94                    if previous_error_file.as_deref() == Some(file) {
95                        by_file
96                            .entry(file.to_string())
97                            .or_default()
98                            .push(trimmed.to_string());
99                    }
100                }
101                _ => previous_error_file = None,
102            }
103        } else if trimmed.contains("error:") && !trimmed.is_empty() {
104            fileless.push(trimmed.to_string());
105            previous_error_file = None;
106        } else {
107            previous_error_file = None;
108        }
109    }
110
111    let mut lines = Vec::new();
112    lines.extend(fileless);
113    for (_file, diagnostics) in by_file {
114        if !lines.is_empty() && !diagnostics.is_empty() {
115            lines.push(String::new());
116        }
117        lines.extend(diagnostics);
118    }
119    if let Some(summary) = summary {
120        if !lines.is_empty() {
121            lines.push(String::new());
122        }
123        lines.push(summary);
124    }
125
126    if lines.is_empty() {
127        return output.trim_end().to_string();
128    }
129
130    finish(&lines.join("\n"))
131}
132
133fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
134    command
135        .split_whitespace()
136        .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
137        .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
138        .map(|token| {
139            token
140                .rsplit(['/', '\\'])
141                .next()
142                .unwrap_or(token)
143                .trim_end_matches(".cmd")
144                .to_string()
145        })
146}
147
148fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
149    let (file, rest) = line.split_once(':')?;
150    let rest = rest.trim_start();
151    let (_, rest) = split_number_prefix(rest)?;
152    let rest = rest.strip_prefix(':')?.trim_start();
153    let rest = if let Some(stripped) = strip_column(rest) {
154        stripped
155    } else {
156        rest
157    };
158    let (severity, _) = rest.split_once(':')?;
159    if matches!(severity, "error" | "note") {
160        Some((file, severity))
161    } else {
162        None
163    }
164}
165
166fn strip_column(rest: &str) -> Option<&str> {
167    let (maybe_column, tail) = rest.split_once(':')?;
168    if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
169        Some(tail.trim_start())
170    } else {
171        None
172    }
173}
174
175fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
176    let digits = input
177        .char_indices()
178        .take_while(|(_, ch)| ch.is_ascii_digit())
179        .last()
180        .map(|(index, ch)| index + ch.len_utf8())?;
181    Some(input.split_at(digits))
182}
183
184fn is_summary_line(trimmed: &str) -> bool {
185    (trimmed.starts_with("Found ") && trimmed.contains(" error"))
186        || trimmed.starts_with("Success: no issues found")
187}
188
189fn finish(input: &str) -> String {
190    let stripped = strip_ansi(input);
191    let deduped = dedup_consecutive(&stripped);
192    cap_lines(
193        &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
194        MAX_LINES,
195    )
196}
197
198fn cap_lines(input: &str, max_lines: usize) -> String {
199    let lines: Vec<&str> = input.lines().collect();
200    if lines.len() <= max_lines {
201        return input.trim_end().to_string();
202    }
203    let mut kept = lines
204        .iter()
205        .take(max_lines)
206        .copied()
207        .collect::<Vec<_>>()
208        .join("\n");
209    kept.push_str(&format!(
210        "\n... truncated {} lines",
211        lines.len() - max_lines
212    ));
213    kept
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn matches_mypy_invocations() {
222        let compressor = MypyCompressor;
223        assert!(compressor.matches("mypy src"));
224        assert!(compressor.matches("python -m mypy src"));
225        assert!(compressor.matches("python3 -m mypy --strict"));
226        assert!(compressor.matches("uv run mypy src"));
227        assert!(!compressor.matches("cargo build"));
228        assert!(!compressor.matches("ls"));
229    }
230
231    #[test]
232    fn compresses_real_success_case() {
233        let output = "Success: no issues found in 1 source file\n";
234        let compressed = compress_mypy(output);
235        assert_eq!(compressed, "mypy: clean");
236        assert!(compressed.len() < output.len());
237    }
238
239    #[test]
240    fn preserves_error_lines_and_summary() {
241        let output = "src/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:15: error: Missing return statement  [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
242        let compressed = compress_mypy(output);
243        assert!(compressed
244            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
245        assert!(compressed.contains("src/a.py:15: error: Missing return statement  [return]"));
246        assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]"));
247        assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
248    }
249
250    #[test]
251    fn keeps_attached_notes_and_drops_standalone_notes() {
252        let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
253        let compressed = compress_mypy(output);
254        assert!(compressed
255            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
256        assert!(compressed.contains("src/a.py:10: note: Expected int"));
257        assert!(!compressed.contains("Standalone note"));
258        assert!(!compressed.contains("Use `Type[X]`"));
259    }
260
261    #[test]
262    fn compresses_large_note_heavy_input() {
263        let mut output = String::new();
264        for index in 0..500 {
265            output.push_str(&format!(
266                "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
267                index,
268                index + 1
269            ));
270        }
271        output.push_str("src/a.py:10: error: Incompatible types in assignment  [assignment]\n");
272        output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
273        let compressed = compress_mypy(&output);
274        assert!(compressed
275            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
276        assert!(compressed.contains("Found 1 error in 1 file"));
277        assert!(compressed.len() < output.len() / 2);
278        assert!(!compressed.contains("Standalone informational"));
279    }
280}