Skip to main content

aft/compress/
pytest.rs

1use crate::compress::caps::{cap_classified_blocks, ClassifiedBlock, DropClass};
2use crate::compress::{CompressionResult, Compressor};
3
4pub struct PytestCompressor;
5
6impl Compressor for PytestCompressor {
7    fn matches(&self, command: &str) -> bool {
8        let tokens: Vec<&str> = command.split_whitespace().collect();
9        tokens.first().is_some_and(|head| *head == "pytest")
10            || tokens
11                .windows(3)
12                .any(|window| matches!(window, ["python" | "python3", "-m", "pytest"]))
13    }
14
15    fn compress_with_exit_code(
16        &self,
17        _command: &str,
18        output: &str,
19        _exit_code: Option<i32>,
20    ) -> CompressionResult {
21        compress_pytest(output)
22    }
23
24    fn matches_output(&self, output: &str) -> bool {
25        output.lines().any(|line| {
26            let trimmed = line.trim();
27            is_section_header(trimmed, "FAILURES")
28                || is_section_header(trimmed, "ERRORS")
29                || is_section_header(trimmed, "short test summary info")
30                || is_pytest_final_summary_signature(trimmed)
31        })
32    }
33}
34
35fn compress_pytest(output: &str) -> CompressionResult {
36    let lines: Vec<&str> = output.lines().collect();
37    let mut blocks = Vec::new();
38    let mut index = 0usize;
39
40    while index < lines.len() {
41        let line = lines[index];
42        let trimmed = line.trim();
43
44        if is_header_line(trimmed) || is_failure_or_error_test_line(trimmed) {
45            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
46            index += 1;
47            continue;
48        }
49
50        if is_section_header(trimmed, "FAILURES") || is_section_header(trimmed, "ERRORS") {
51            let class = if is_section_header(trimmed, "ERRORS") {
52                DropClass::Error
53            } else {
54                DropClass::Failure
55            };
56            let (section_blocks, next_index) = compress_failure_section(&lines, index, class);
57            blocks.extend(section_blocks);
58            index = next_index;
59            continue;
60        }
61
62        if is_section_header(trimmed, "warnings summary") {
63            let (warnings, next_index) = compress_warnings(&lines, index);
64            blocks.extend(warnings);
65            index = next_index;
66            continue;
67        }
68
69        if is_section_header(trimmed, "short test summary info") || is_final_summary(trimmed) {
70            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
71            index += 1;
72            continue;
73        }
74
75        if is_pass_status_line(trimmed) {
76            index += 1;
77            continue;
78        }
79
80        index += 1;
81    }
82
83    let capped = cap_classified_blocks(blocks);
84    CompressionResult::with_class_drops(trim_trailing_lines(&capped.text), capped.dropped_by_class)
85}
86
87fn compress_failure_section(
88    lines: &[&str],
89    start: usize,
90    class: DropClass,
91) -> (Vec<ClassifiedBlock>, usize) {
92    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
93    let mut index = start + 1;
94    let mut current: Vec<String> = Vec::new();
95
96    while index < lines.len() {
97        let line = lines[index];
98        let trimmed = line.trim();
99        if trimmed.starts_with('=') && trimmed.ends_with('=') {
100            break;
101        }
102        if is_pytest_case_header(trimmed) && !current.is_empty() {
103            blocks.push(ClassifiedBlock::new(class, current.join("\n")));
104            current.clear();
105        }
106        current.push(line.to_string());
107        index += 1;
108    }
109
110    if !current.is_empty() {
111        blocks.push(ClassifiedBlock::new(class, current.join("\n")));
112    }
113
114    (blocks, index)
115}
116
117fn is_pytest_case_header(trimmed: &str) -> bool {
118    (trimmed.starts_with('_') && trimmed.ends_with('_'))
119        || trimmed.starts_with("ERROR at ")
120        || trimmed.starts_with("FAILED ")
121        || trimmed.starts_with("ERROR ")
122}
123
124fn is_header_line(trimmed: &str) -> bool {
125    trimmed.starts_with("platform ")
126        || trimmed.starts_with("rootdir:")
127        || trimmed.starts_with("collected ")
128}
129
130fn is_failure_or_error_test_line(trimmed: &str) -> bool {
131    trimmed.contains(" FAILED")
132        || trimmed.ends_with(" FAILED")
133        || trimmed.contains(" ERROR")
134        || trimmed.ends_with(" ERROR")
135}
136
137fn is_section_header(trimmed: &str, name: &str) -> bool {
138    trimmed.starts_with('=') && trimmed.contains(name) && trimmed.ends_with('=')
139}
140
141fn is_pass_status_line(trimmed: &str) -> bool {
142    !trimmed.is_empty()
143        && (trimmed
144            .chars()
145            .all(|char| matches!(char, '.' | 's' | 'x' | 'X'))
146            || trimmed.ends_with(" PASSED")
147            || trimmed.contains(" PASSED "))
148}
149
150fn is_pytest_final_summary_signature(trimmed: &str) -> bool {
151    if !trimmed.starts_with('=') || !trimmed.ends_with('=') {
152        return false;
153    }
154    let body = trimmed.trim_matches('=').trim();
155    let has_status = body
156        .split(|ch: char| !ch.is_ascii_alphabetic())
157        .any(|word| matches!(word, "passed" | "failed" | "error" | "errors"));
158    if !has_status {
159        return false;
160    }
161    let Some((_, after_in)) = body.rsplit_once(" in ") else {
162        return false;
163    };
164    let Some(duration) = after_in.split_whitespace().next() else {
165        return false;
166    };
167    let Some(seconds) = duration.strip_suffix('s') else {
168        return false;
169    };
170    !seconds.is_empty() && seconds.chars().all(|ch| ch.is_ascii_digit() || ch == '.')
171}
172
173fn is_final_summary(trimmed: &str) -> bool {
174    trimmed.starts_with('=')
175        && (trimmed.contains(" passed")
176            || trimmed.contains(" failed")
177            || trimmed.contains(" error")
178            || trimmed.contains(" skipped")
179            || trimmed.contains(" xfailed"))
180        && trimmed.ends_with('=')
181}
182
183fn compress_warnings(lines: &[&str], start: usize) -> (Vec<ClassifiedBlock>, usize) {
184    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
185    let mut index = start + 1;
186    let mut current: Vec<String> = Vec::new();
187
188    while index < lines.len() {
189        let line = lines[index];
190        let trimmed = line.trim();
191        if trimmed.starts_with('=') && trimmed.ends_with('=') {
192            break;
193        }
194        if is_warning_entry(trimmed) && !current.is_empty() {
195            blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
196            current.clear();
197        }
198        current.push(line.to_string());
199        index += 1;
200    }
201
202    if !current.is_empty() {
203        blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
204    }
205
206    (blocks, index)
207}
208
209fn is_warning_entry(trimmed: &str) -> bool {
210    trimmed.contains("Warning:") || trimmed.contains("warning:") || trimmed.starts_with("tests/")
211}
212
213fn trim_trailing_lines(input: &str) -> String {
214    input
215        .lines()
216        .map(str::trim_end)
217        .collect::<Vec<_>>()
218        .join("\n")
219}