Skip to main content

aft/compress/
pytest.rs

1use crate::compress::caps::{cap_classified_blocks, ClassifiedBlock, DropClass};
2use crate::compress::generic::GenericCompressor;
3use crate::compress::{CompressionResult, Compressor};
4
5pub struct PytestCompressor;
6
7impl Compressor for PytestCompressor {
8    fn matches(&self, command: &str) -> bool {
9        let tokens: Vec<&str> = command.split_whitespace().collect();
10        tokens.first().is_some_and(|head| *head == "pytest")
11            || tokens
12                .windows(3)
13                .any(|window| matches!(window, ["python" | "python3", "-m", "pytest"]))
14    }
15
16    fn compress_with_exit_code(
17        &self,
18        _command: &str,
19        output: &str,
20        _exit_code: Option<i32>,
21    ) -> CompressionResult {
22        compress_pytest(output)
23    }
24
25    fn matches_output(&self, output: &str) -> bool {
26        output.lines().any(|line| {
27            let trimmed = line.trim();
28            is_section_header(trimmed, "FAILURES")
29                || is_section_header(trimmed, "ERRORS")
30                || is_section_header(trimmed, "short test summary info")
31                || is_pytest_final_summary_signature(trimmed)
32        })
33    }
34}
35
36fn compress_pytest(output: &str) -> CompressionResult {
37    let lines: Vec<&str> = output.lines().collect();
38    let mut blocks = Vec::new();
39    let mut index = 0usize;
40
41    while index < lines.len() {
42        let line = lines[index];
43        let trimmed = line.trim();
44
45        if is_header_line(trimmed) || is_failure_or_error_test_line(trimmed) {
46            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
47            index += 1;
48            continue;
49        }
50
51        if is_section_header(trimmed, "FAILURES") || is_section_header(trimmed, "ERRORS") {
52            let class = if is_section_header(trimmed, "ERRORS") {
53                DropClass::Error
54            } else {
55                DropClass::Failure
56            };
57            let (section_blocks, next_index) = compress_failure_section(&lines, index, class);
58            blocks.extend(section_blocks);
59            index = next_index;
60            continue;
61        }
62
63        if is_section_header(trimmed, "warnings summary") {
64            let (warnings, next_index) = compress_warnings(&lines, index);
65            blocks.extend(warnings);
66            index = next_index;
67            continue;
68        }
69
70        if is_section_header(trimmed, "short test summary info") || is_final_summary(trimmed) {
71            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
72            index += 1;
73            continue;
74        }
75
76        if is_pass_status_line(trimmed) {
77            index += 1;
78            continue;
79        }
80
81        index += 1;
82    }
83
84    let capped = cap_classified_blocks(blocks);
85    let compressed = CompressionResult::with_class_drops(
86        trim_trailing_lines(&capped.text),
87        capped.dropped_by_class,
88    );
89    preserve_pytest_failure(output, compressed)
90}
91
92fn preserve_pytest_failure(output: &str, compressed: CompressionResult) -> CompressionResult {
93    let stripped_failure =
94        compressed.text.trim().is_empty() || !super::text_has_failure_signal(&compressed.text);
95    if !output.trim().is_empty() && super::text_has_failure_signal(output) && stripped_failure {
96        GenericCompressor::compress_output(output).into()
97    } else {
98        compressed
99    }
100}
101
102fn compress_failure_section(
103    lines: &[&str],
104    start: usize,
105    class: DropClass,
106) -> (Vec<ClassifiedBlock>, usize) {
107    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
108    let mut index = start + 1;
109    let mut current: Vec<String> = Vec::new();
110
111    while index < lines.len() {
112        let line = lines[index];
113        let trimmed = line.trim();
114        if is_recognized_section_boundary(trimmed) {
115            break;
116        }
117        if is_pytest_case_header(trimmed) && !current.is_empty() {
118            blocks.push(ClassifiedBlock::new(class, current.join("\n")));
119            current.clear();
120        }
121        current.push(line.to_string());
122        index += 1;
123    }
124
125    if !current.is_empty() {
126        blocks.push(ClassifiedBlock::new(class, current.join("\n")));
127    }
128
129    (blocks, index)
130}
131
132fn is_recognized_section_boundary(trimmed: &str) -> bool {
133    is_section_header(trimmed, "FAILURES")
134        || is_section_header(trimmed, "ERRORS")
135        || is_section_header(trimmed, "warnings summary")
136        || is_section_header(trimmed, "short test summary info")
137        || is_final_summary(trimmed)
138}
139
140fn is_pytest_case_header(trimmed: &str) -> bool {
141    (trimmed.starts_with('_') && trimmed.ends_with('_'))
142        || trimmed.starts_with("ERROR at ")
143        || trimmed.starts_with("FAILED ")
144        || trimmed.starts_with("ERROR ")
145}
146
147fn is_header_line(trimmed: &str) -> bool {
148    trimmed.starts_with("platform ")
149        || trimmed.starts_with("rootdir:")
150        || trimmed.starts_with("collected ")
151}
152
153fn is_failure_or_error_test_line(trimmed: &str) -> bool {
154    trimmed.contains(" FAILED")
155        || trimmed.ends_with(" FAILED")
156        || trimmed.contains(" ERROR")
157        || trimmed.ends_with(" ERROR")
158}
159
160fn is_section_header(trimmed: &str, name: &str) -> bool {
161    trimmed.starts_with('=') && trimmed.contains(name) && trimmed.ends_with('=')
162}
163
164fn is_pass_status_line(trimmed: &str) -> bool {
165    !trimmed.is_empty()
166        && (trimmed
167            .chars()
168            .all(|char| matches!(char, '.' | 's' | 'x' | 'X'))
169            || trimmed.ends_with(" PASSED")
170            || trimmed.contains(" PASSED "))
171}
172
173fn is_pytest_final_summary_signature(trimmed: &str) -> bool {
174    if !trimmed.starts_with('=') || !trimmed.ends_with('=') {
175        return false;
176    }
177    let body = trimmed.trim_matches('=').trim();
178    let has_status = body
179        .split(|ch: char| !ch.is_ascii_alphabetic())
180        .any(|word| matches!(word, "passed" | "failed" | "error" | "errors"));
181    if !has_status {
182        return false;
183    }
184    let Some((_, after_in)) = body.rsplit_once(" in ") else {
185        return false;
186    };
187    let Some(duration) = after_in.split_whitespace().next() else {
188        return false;
189    };
190    let Some(seconds) = duration.strip_suffix('s') else {
191        return false;
192    };
193    !seconds.is_empty() && seconds.chars().all(|ch| ch.is_ascii_digit() || ch == '.')
194}
195
196fn is_final_summary(trimmed: &str) -> bool {
197    trimmed.starts_with('=')
198        && (trimmed.contains(" passed")
199            || trimmed.contains(" failed")
200            || trimmed.contains(" error")
201            || trimmed.contains(" skipped")
202            || trimmed.contains(" xfailed"))
203        && trimmed.ends_with('=')
204}
205
206fn compress_warnings(lines: &[&str], start: usize) -> (Vec<ClassifiedBlock>, usize) {
207    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
208    let mut index = start + 1;
209    let mut current: Vec<String> = Vec::new();
210
211    while index < lines.len() {
212        let line = lines[index];
213        let trimmed = line.trim();
214        if trimmed.starts_with('=') && trimmed.ends_with('=') {
215            break;
216        }
217        if is_warning_entry(trimmed) && !current.is_empty() {
218            blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
219            current.clear();
220        }
221        current.push(line.to_string());
222        index += 1;
223    }
224
225    if !current.is_empty() {
226        blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
227    }
228
229    (blocks, index)
230}
231
232fn is_warning_entry(trimmed: &str) -> bool {
233    trimmed.contains("Warning:") || trimmed.contains("warning:") || trimmed.starts_with("tests/")
234}
235
236fn trim_trailing_lines(input: &str) -> String {
237    input
238        .lines()
239        .map(str::trim_end)
240        .collect::<Vec<_>>()
241        .join("\n")
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn pytest_no_module_error_does_not_compress_to_empty() {
250        let output = "/usr/bin/python3: No module named pytest\n";
251
252        let compressed = PytestCompressor.compress("python3 -m pytest", output);
253
254        assert!(compressed.text.contains("No module named pytest"));
255    }
256
257    #[test]
258    fn pytest_internalerror_does_not_compress_to_empty() {
259        let output = "INTERNALERROR> Traceback (most recent call last):\nINTERNALERROR> RuntimeError: plugin exploded\n";
260
261        let compressed = PytestCompressor.compress("pytest", output);
262
263        assert!(compressed.text.contains("INTERNALERROR"));
264        assert!(compressed.text.contains("RuntimeError"));
265    }
266
267    #[test]
268    fn pytest_failure_section_keeps_unrecognized_equals_traceback_lines() {
269        let output = "============================= FAILURES =============================\n____________________________ test_example ____________________________\nTraceback (most recent call last):\n======= custom traceback divider =======\nException: boom\n=========================== short test summary info ===========================\nFAILED tests/test_example.py::test_example - Exception: boom\n";
270
271        let compressed = PytestCompressor.compress("pytest", output);
272
273        assert!(compressed
274            .text
275            .contains("======= custom traceback divider ======="));
276        assert!(compressed.text.contains("Exception: boom"));
277    }
278}