Skip to main content

aft/compress/
pytest.rs

1use crate::compress::caps::{cap_classified_blocks, ClassifiedBlock, DropClass};
2use crate::compress::{CompressionResult, Compressor};
3
4pub struct PytestCompressor;
5
6impl Compressor for PytestCompressor {
7    fn matches(&self, command: &str) -> bool {
8        let tokens: Vec<&str> = command.split_whitespace().collect();
9        tokens.first().is_some_and(|head| *head == "pytest")
10            || tokens
11                .windows(3)
12                .any(|window| matches!(window, ["python" | "python3", "-m", "pytest"]))
13    }
14
15    fn compress(&self, _command: &str, output: &str) -> CompressionResult {
16        compress_pytest(output)
17    }
18
19    fn matches_output(&self, output: &str) -> bool {
20        output.lines().any(|line| {
21            let trimmed = line.trim();
22            is_section_header(trimmed, "FAILURES")
23                || is_section_header(trimmed, "ERRORS")
24                || is_section_header(trimmed, "short test summary info")
25                || is_pytest_final_summary_signature(trimmed)
26        })
27    }
28}
29
30fn compress_pytest(output: &str) -> CompressionResult {
31    let lines: Vec<&str> = output.lines().collect();
32    let mut blocks = Vec::new();
33    let mut index = 0usize;
34
35    while index < lines.len() {
36        let line = lines[index];
37        let trimmed = line.trim();
38
39        if is_header_line(trimmed) || is_failure_or_error_test_line(trimmed) {
40            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
41            index += 1;
42            continue;
43        }
44
45        if is_section_header(trimmed, "FAILURES") || is_section_header(trimmed, "ERRORS") {
46            let class = if is_section_header(trimmed, "ERRORS") {
47                DropClass::Error
48            } else {
49                DropClass::Failure
50            };
51            let (section_blocks, next_index) = compress_failure_section(&lines, index, class);
52            blocks.extend(section_blocks);
53            index = next_index;
54            continue;
55        }
56
57        if is_section_header(trimmed, "warnings summary") {
58            let (warnings, next_index) = compress_warnings(&lines, index);
59            blocks.extend(warnings);
60            index = next_index;
61            continue;
62        }
63
64        if is_section_header(trimmed, "short test summary info") || is_final_summary(trimmed) {
65            blocks.push(ClassifiedBlock::unclassified(line.to_string()));
66            index += 1;
67            continue;
68        }
69
70        if is_pass_status_line(trimmed) {
71            index += 1;
72            continue;
73        }
74
75        index += 1;
76    }
77
78    let capped = cap_classified_blocks(blocks);
79    CompressionResult::with_class_drops(trim_trailing_lines(&capped.text), capped.dropped_by_class)
80}
81
82fn compress_failure_section(
83    lines: &[&str],
84    start: usize,
85    class: DropClass,
86) -> (Vec<ClassifiedBlock>, usize) {
87    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
88    let mut index = start + 1;
89    let mut current: Vec<String> = Vec::new();
90
91    while index < lines.len() {
92        let line = lines[index];
93        let trimmed = line.trim();
94        if trimmed.starts_with('=') && trimmed.ends_with('=') {
95            break;
96        }
97        if is_pytest_case_header(trimmed) && !current.is_empty() {
98            blocks.push(ClassifiedBlock::new(class, current.join("\n")));
99            current.clear();
100        }
101        current.push(line.to_string());
102        index += 1;
103    }
104
105    if !current.is_empty() {
106        blocks.push(ClassifiedBlock::new(class, current.join("\n")));
107    }
108
109    (blocks, index)
110}
111
112fn is_pytest_case_header(trimmed: &str) -> bool {
113    (trimmed.starts_with('_') && trimmed.ends_with('_'))
114        || trimmed.starts_with("ERROR at ")
115        || trimmed.starts_with("FAILED ")
116        || trimmed.starts_with("ERROR ")
117}
118
119fn is_header_line(trimmed: &str) -> bool {
120    trimmed.starts_with("platform ")
121        || trimmed.starts_with("rootdir:")
122        || trimmed.starts_with("collected ")
123}
124
125fn is_failure_or_error_test_line(trimmed: &str) -> bool {
126    trimmed.contains(" FAILED")
127        || trimmed.ends_with(" FAILED")
128        || trimmed.contains(" ERROR")
129        || trimmed.ends_with(" ERROR")
130}
131
132fn is_section_header(trimmed: &str, name: &str) -> bool {
133    trimmed.starts_with('=') && trimmed.contains(name) && trimmed.ends_with('=')
134}
135
136fn is_pass_status_line(trimmed: &str) -> bool {
137    !trimmed.is_empty()
138        && (trimmed
139            .chars()
140            .all(|char| matches!(char, '.' | 's' | 'x' | 'X'))
141            || trimmed.ends_with(" PASSED")
142            || trimmed.contains(" PASSED "))
143}
144
145fn is_pytest_final_summary_signature(trimmed: &str) -> bool {
146    if !trimmed.starts_with('=') || !trimmed.ends_with('=') {
147        return false;
148    }
149    let body = trimmed.trim_matches('=').trim();
150    let has_status = body
151        .split(|ch: char| !ch.is_ascii_alphabetic())
152        .any(|word| matches!(word, "passed" | "failed" | "error" | "errors"));
153    if !has_status {
154        return false;
155    }
156    let Some((_, after_in)) = body.rsplit_once(" in ") else {
157        return false;
158    };
159    let Some(duration) = after_in.split_whitespace().next() else {
160        return false;
161    };
162    let Some(seconds) = duration.strip_suffix('s') else {
163        return false;
164    };
165    !seconds.is_empty() && seconds.chars().all(|ch| ch.is_ascii_digit() || ch == '.')
166}
167
168fn is_final_summary(trimmed: &str) -> bool {
169    trimmed.starts_with('=')
170        && (trimmed.contains(" passed")
171            || trimmed.contains(" failed")
172            || trimmed.contains(" error")
173            || trimmed.contains(" skipped")
174            || trimmed.contains(" xfailed"))
175        && trimmed.ends_with('=')
176}
177
178fn compress_warnings(lines: &[&str], start: usize) -> (Vec<ClassifiedBlock>, usize) {
179    let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
180    let mut index = start + 1;
181    let mut current: Vec<String> = Vec::new();
182
183    while index < lines.len() {
184        let line = lines[index];
185        let trimmed = line.trim();
186        if trimmed.starts_with('=') && trimmed.ends_with('=') {
187            break;
188        }
189        if is_warning_entry(trimmed) && !current.is_empty() {
190            blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
191            current.clear();
192        }
193        current.push(line.to_string());
194        index += 1;
195    }
196
197    if !current.is_empty() {
198        blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
199    }
200
201    (blocks, index)
202}
203
204fn is_warning_entry(trimmed: &str) -> bool {
205    trimmed.contains("Warning:") || trimmed.contains("warning:") || trimmed.starts_with("tests/")
206}
207
208fn trim_trailing_lines(input: &str) -> String {
209    input
210        .lines()
211        .map(str::trim_end)
212        .collect::<Vec<_>>()
213        .join("\n")
214}