Skip to main content

aft/compress/
mypy.rs

1use std::collections::BTreeMap;
2
3use crate::compress::generic::{dedup_consecutive, middle_truncate, strip_ansi};
4use crate::compress::Compressor;
5
6const MAX_LINES: usize = 300;
7
8pub struct MypyCompressor;
9
10impl Compressor for MypyCompressor {
11    fn matches(&self, command: &str) -> bool {
12        let tokens = command_tokens(command).collect::<Vec<_>>();
13        tokens.iter().any(|token| token == "mypy")
14            || tokens
15                .windows(3)
16                .any(|window| matches!(window, [python, flag, module] if (python == "python" || python == "python3") && flag == "-m" && module == "mypy"))
17    }
18
19    fn compress(&self, _command: &str, output: &str) -> String {
20        compress_mypy(output)
21    }
22}
23
24fn compress_mypy(output: &str) -> String {
25    let trimmed = output.trim();
26    if trimmed.starts_with("Success: no issues found") {
27        return "mypy: clean".to_string();
28    }
29
30    let mut by_file: BTreeMap<String, Vec<String>> = BTreeMap::new();
31    let mut fileless = Vec::new();
32    let mut summary = None;
33    let mut previous_error_file: Option<String> = None;
34
35    for line in output.lines() {
36        let trimmed = line.trim_end();
37        if is_summary_line(trimmed) {
38            summary = Some(trimmed.to_string());
39            previous_error_file = None;
40            continue;
41        }
42
43        if let Some((file, severity)) = parse_mypy_line(trimmed) {
44            match severity {
45                "error" => {
46                    by_file
47                        .entry(file.to_string())
48                        .or_default()
49                        .push(trimmed.to_string());
50                    previous_error_file = Some(file.to_string());
51                }
52                "note" => {
53                    if previous_error_file.as_deref() == Some(file) {
54                        by_file
55                            .entry(file.to_string())
56                            .or_default()
57                            .push(trimmed.to_string());
58                    }
59                }
60                _ => previous_error_file = None,
61            }
62        } else if trimmed.contains("error:") && !trimmed.is_empty() {
63            fileless.push(trimmed.to_string());
64            previous_error_file = None;
65        } else {
66            previous_error_file = None;
67        }
68    }
69
70    let mut lines = Vec::new();
71    lines.extend(fileless);
72    for (_file, diagnostics) in by_file {
73        if !lines.is_empty() && !diagnostics.is_empty() {
74            lines.push(String::new());
75        }
76        lines.extend(diagnostics);
77    }
78    if let Some(summary) = summary {
79        if !lines.is_empty() {
80            lines.push(String::new());
81        }
82        lines.push(summary);
83    }
84
85    if lines.is_empty() {
86        return output.trim_end().to_string();
87    }
88
89    finish(&lines.join("\n"))
90}
91
92fn command_tokens(command: &str) -> impl Iterator<Item = String> + '_ {
93    command
94        .split_whitespace()
95        .map(|token| token.trim_matches(|ch| matches!(ch, '\'' | '"')))
96        .filter(|token| !matches!(*token, "npx" | "pnpm" | "yarn" | "bun" | "bunx" | "exec"))
97        .map(|token| {
98            token
99                .rsplit(['/', '\\'])
100                .next()
101                .unwrap_or(token)
102                .trim_end_matches(".cmd")
103                .to_string()
104        })
105}
106
107fn parse_mypy_line(line: &str) -> Option<(&str, &str)> {
108    let (file, rest) = line.split_once(':')?;
109    let rest = rest.trim_start();
110    let (_, rest) = split_number_prefix(rest)?;
111    let rest = rest.strip_prefix(':')?.trim_start();
112    let rest = if let Some(stripped) = strip_column(rest) {
113        stripped
114    } else {
115        rest
116    };
117    let (severity, _) = rest.split_once(':')?;
118    if matches!(severity, "error" | "note") {
119        Some((file, severity))
120    } else {
121        None
122    }
123}
124
125fn strip_column(rest: &str) -> Option<&str> {
126    let (maybe_column, tail) = rest.split_once(':')?;
127    if maybe_column.chars().all(|ch| ch.is_ascii_digit()) {
128        Some(tail.trim_start())
129    } else {
130        None
131    }
132}
133
134fn split_number_prefix(input: &str) -> Option<(&str, &str)> {
135    let digits = input
136        .char_indices()
137        .take_while(|(_, ch)| ch.is_ascii_digit())
138        .last()
139        .map(|(index, ch)| index + ch.len_utf8())?;
140    Some(input.split_at(digits))
141}
142
143fn is_summary_line(trimmed: &str) -> bool {
144    (trimmed.starts_with("Found ") && trimmed.contains(" error"))
145        || trimmed.starts_with("Success: no issues found")
146}
147
148fn finish(input: &str) -> String {
149    let stripped = strip_ansi(input);
150    let deduped = dedup_consecutive(&stripped);
151    cap_lines(
152        &middle_truncate(&deduped, 32 * 1024, 16 * 1024, 16 * 1024),
153        MAX_LINES,
154    )
155}
156
157fn cap_lines(input: &str, max_lines: usize) -> String {
158    let lines: Vec<&str> = input.lines().collect();
159    if lines.len() <= max_lines {
160        return input.trim_end().to_string();
161    }
162    let mut kept = lines
163        .iter()
164        .take(max_lines)
165        .copied()
166        .collect::<Vec<_>>()
167        .join("\n");
168    kept.push_str(&format!(
169        "\n... truncated {} lines",
170        lines.len() - max_lines
171    ));
172    kept
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn matches_mypy_invocations() {
181        let compressor = MypyCompressor;
182        assert!(compressor.matches("mypy src"));
183        assert!(compressor.matches("python -m mypy src"));
184        assert!(compressor.matches("python3 -m mypy --strict"));
185        assert!(compressor.matches("uv run mypy src"));
186        assert!(!compressor.matches("cargo build"));
187        assert!(!compressor.matches("ls"));
188    }
189
190    #[test]
191    fn compresses_real_success_case() {
192        let output = "Success: no issues found in 1 source file\n";
193        let compressed = compress_mypy(output);
194        assert_eq!(compressed, "mypy: clean");
195        assert!(compressed.len() < output.len());
196    }
197
198    #[test]
199    fn preserves_error_lines_and_summary() {
200        let output = "src/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:15: error: Missing return statement  [return]\nsrc/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]\nFound 3 errors in 2 files (checked 50 source files)\n";
201        let compressed = compress_mypy(output);
202        assert!(compressed
203            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
204        assert!(compressed.contains("src/a.py:15: error: Missing return statement  [return]"));
205        assert!(compressed.contains("src/b.py:5: error: Argument 1 to \"foo\" has incompatible type \"str\"; expected \"int\"  [arg-type]"));
206        assert!(compressed.contains("Found 3 errors in 2 files (checked 50 source files)"));
207    }
208
209    #[test]
210    fn keeps_attached_notes_and_drops_standalone_notes() {
211        let output = "src/a.py:1: note: Standalone note\nsrc/a.py:10: error: Incompatible types in assignment  [assignment]\nsrc/a.py:10: note: Expected int\nsrc/b.py:8: note: Use `Type[X]` for class types\nFound 1 error in 1 file (checked 2 source files)\n";
212        let compressed = compress_mypy(output);
213        assert!(compressed
214            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
215        assert!(compressed.contains("src/a.py:10: note: Expected int"));
216        assert!(!compressed.contains("Standalone note"));
217        assert!(!compressed.contains("Use `Type[X]`"));
218    }
219
220    #[test]
221    fn compresses_large_note_heavy_input() {
222        let mut output = String::new();
223        for index in 0..500 {
224            output.push_str(&format!(
225                "src/file{}.py:{}: note: Standalone informational note that should be dropped\n",
226                index,
227                index + 1
228            ));
229        }
230        output.push_str("src/a.py:10: error: Incompatible types in assignment  [assignment]\n");
231        output.push_str("Found 1 error in 1 file (checked 501 source files)\n");
232        let compressed = compress_mypy(&output);
233        assert!(compressed
234            .contains("src/a.py:10: error: Incompatible types in assignment  [assignment]"));
235        assert!(compressed.contains("Found 1 error in 1 file"));
236        assert!(compressed.len() < output.len() / 2);
237        assert!(!compressed.contains("Standalone informational"));
238    }
239}