1use crate::compress::caps::{cap_classified_blocks, ClassifiedBlock, DropClass};
2use crate::compress::generic::GenericCompressor;
3use crate::compress::{CompressionResult, Compressor};
4
5pub struct PytestCompressor;
6
7impl Compressor for PytestCompressor {
8 fn matches(&self, command: &str) -> bool {
9 let tokens: Vec<&str> = command.split_whitespace().collect();
10 tokens.first().is_some_and(|head| *head == "pytest")
11 || tokens
12 .windows(3)
13 .any(|window| matches!(window, ["python" | "python3", "-m", "pytest"]))
14 }
15
16 fn compress_with_exit_code(
17 &self,
18 _command: &str,
19 output: &str,
20 _exit_code: Option<i32>,
21 ) -> CompressionResult {
22 compress_pytest(output)
23 }
24
25 fn matches_output(&self, output: &str) -> bool {
26 output.lines().any(|line| {
27 let trimmed = line.trim();
28 is_section_header(trimmed, "FAILURES")
29 || is_section_header(trimmed, "ERRORS")
30 || is_section_header(trimmed, "short test summary info")
31 || is_pytest_final_summary_signature(trimmed)
32 })
33 }
34}
35
36fn compress_pytest(output: &str) -> CompressionResult {
37 let lines: Vec<&str> = output.lines().collect();
38 let mut blocks = Vec::new();
39 let mut index = 0usize;
40
41 while index < lines.len() {
42 let line = lines[index];
43 let trimmed = line.trim();
44
45 if is_header_line(trimmed) || is_failure_or_error_test_line(trimmed) {
46 blocks.push(ClassifiedBlock::unclassified(line.to_string()));
47 index += 1;
48 continue;
49 }
50
51 if is_section_header(trimmed, "FAILURES") || is_section_header(trimmed, "ERRORS") {
52 let class = if is_section_header(trimmed, "ERRORS") {
53 DropClass::Error
54 } else {
55 DropClass::Failure
56 };
57 let (section_blocks, next_index) = compress_failure_section(&lines, index, class);
58 blocks.extend(section_blocks);
59 index = next_index;
60 continue;
61 }
62
63 if is_section_header(trimmed, "warnings summary") {
64 let (warnings, next_index) = compress_warnings(&lines, index);
65 blocks.extend(warnings);
66 index = next_index;
67 continue;
68 }
69
70 if is_section_header(trimmed, "short test summary info") || is_final_summary(trimmed) {
71 blocks.push(ClassifiedBlock::unclassified(line.to_string()));
72 index += 1;
73 continue;
74 }
75
76 if is_pass_status_line(trimmed) {
77 index += 1;
78 continue;
79 }
80
81 index += 1;
82 }
83
84 let capped = cap_classified_blocks(blocks);
85 let compressed = CompressionResult::with_class_drops(
86 trim_trailing_lines(&capped.text),
87 capped.dropped_by_class,
88 );
89 preserve_pytest_failure(output, compressed)
90}
91
92fn preserve_pytest_failure(output: &str, compressed: CompressionResult) -> CompressionResult {
93 let stripped_failure =
94 compressed.text.trim().is_empty() || !super::text_has_failure_signal(&compressed.text);
95 if !output.trim().is_empty() && super::text_has_failure_signal(output) && stripped_failure {
96 GenericCompressor::compress_output(output).into()
97 } else {
98 compressed
99 }
100}
101
102fn compress_failure_section(
103 lines: &[&str],
104 start: usize,
105 class: DropClass,
106) -> (Vec<ClassifiedBlock>, usize) {
107 let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
108 let mut index = start + 1;
109 let mut current: Vec<String> = Vec::new();
110
111 while index < lines.len() {
112 let line = lines[index];
113 let trimmed = line.trim();
114 if is_recognized_section_boundary(trimmed) {
115 break;
116 }
117 if is_pytest_case_header(trimmed) && !current.is_empty() {
118 blocks.push(ClassifiedBlock::new(class, current.join("\n")));
119 current.clear();
120 }
121 current.push(line.to_string());
122 index += 1;
123 }
124
125 if !current.is_empty() {
126 blocks.push(ClassifiedBlock::new(class, current.join("\n")));
127 }
128
129 (blocks, index)
130}
131
132fn is_recognized_section_boundary(trimmed: &str) -> bool {
133 is_section_header(trimmed, "FAILURES")
134 || is_section_header(trimmed, "ERRORS")
135 || is_section_header(trimmed, "warnings summary")
136 || is_section_header(trimmed, "short test summary info")
137 || is_final_summary(trimmed)
138}
139
140fn is_pytest_case_header(trimmed: &str) -> bool {
141 (trimmed.starts_with('_') && trimmed.ends_with('_'))
142 || trimmed.starts_with("ERROR at ")
143 || trimmed.starts_with("FAILED ")
144 || trimmed.starts_with("ERROR ")
145}
146
147fn is_header_line(trimmed: &str) -> bool {
148 trimmed.starts_with("platform ")
149 || trimmed.starts_with("rootdir:")
150 || trimmed.starts_with("collected ")
151}
152
153fn is_failure_or_error_test_line(trimmed: &str) -> bool {
154 trimmed.contains(" FAILED")
155 || trimmed.ends_with(" FAILED")
156 || trimmed.contains(" ERROR")
157 || trimmed.ends_with(" ERROR")
158}
159
160fn is_section_header(trimmed: &str, name: &str) -> bool {
161 trimmed.starts_with('=') && trimmed.contains(name) && trimmed.ends_with('=')
162}
163
164fn is_pass_status_line(trimmed: &str) -> bool {
165 !trimmed.is_empty()
166 && (trimmed
167 .chars()
168 .all(|char| matches!(char, '.' | 's' | 'x' | 'X'))
169 || trimmed.ends_with(" PASSED")
170 || trimmed.contains(" PASSED "))
171}
172
173fn is_pytest_final_summary_signature(trimmed: &str) -> bool {
174 if !trimmed.starts_with('=') || !trimmed.ends_with('=') {
175 return false;
176 }
177 let body = trimmed.trim_matches('=').trim();
178 let has_status = body
179 .split(|ch: char| !ch.is_ascii_alphabetic())
180 .any(|word| matches!(word, "passed" | "failed" | "error" | "errors"));
181 if !has_status {
182 return false;
183 }
184 let Some((_, after_in)) = body.rsplit_once(" in ") else {
185 return false;
186 };
187 let Some(duration) = after_in.split_whitespace().next() else {
188 return false;
189 };
190 let Some(seconds) = duration.strip_suffix('s') else {
191 return false;
192 };
193 !seconds.is_empty() && seconds.chars().all(|ch| ch.is_ascii_digit() || ch == '.')
194}
195
196fn is_final_summary(trimmed: &str) -> bool {
197 trimmed.starts_with('=')
198 && (trimmed.contains(" passed")
199 || trimmed.contains(" failed")
200 || trimmed.contains(" error")
201 || trimmed.contains(" skipped")
202 || trimmed.contains(" xfailed"))
203 && trimmed.ends_with('=')
204}
205
206fn compress_warnings(lines: &[&str], start: usize) -> (Vec<ClassifiedBlock>, usize) {
207 let mut blocks = vec![ClassifiedBlock::unclassified(lines[start].to_string())];
208 let mut index = start + 1;
209 let mut current: Vec<String> = Vec::new();
210
211 while index < lines.len() {
212 let line = lines[index];
213 let trimmed = line.trim();
214 if trimmed.starts_with('=') && trimmed.ends_with('=') {
215 break;
216 }
217 if is_warning_entry(trimmed) && !current.is_empty() {
218 blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
219 current.clear();
220 }
221 current.push(line.to_string());
222 index += 1;
223 }
224
225 if !current.is_empty() {
226 blocks.push(ClassifiedBlock::new(DropClass::Warning, current.join("\n")));
227 }
228
229 (blocks, index)
230}
231
232fn is_warning_entry(trimmed: &str) -> bool {
233 trimmed.contains("Warning:") || trimmed.contains("warning:") || trimmed.starts_with("tests/")
234}
235
236fn trim_trailing_lines(input: &str) -> String {
237 input
238 .lines()
239 .map(str::trim_end)
240 .collect::<Vec<_>>()
241 .join("\n")
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn pytest_no_module_error_does_not_compress_to_empty() {
250 let output = "/usr/bin/python3: No module named pytest\n";
251
252 let compressed = PytestCompressor.compress("python3 -m pytest", output);
253
254 assert!(compressed.text.contains("No module named pytest"));
255 }
256
257 #[test]
258 fn pytest_internalerror_does_not_compress_to_empty() {
259 let output = "INTERNALERROR> Traceback (most recent call last):\nINTERNALERROR> RuntimeError: plugin exploded\n";
260
261 let compressed = PytestCompressor.compress("pytest", output);
262
263 assert!(compressed.text.contains("INTERNALERROR"));
264 assert!(compressed.text.contains("RuntimeError"));
265 }
266
267 #[test]
268 fn pytest_failure_section_keeps_unrecognized_equals_traceback_lines() {
269 let output = "============================= FAILURES =============================\n____________________________ test_example ____________________________\nTraceback (most recent call last):\n======= custom traceback divider =======\nException: boom\n=========================== short test summary info ===========================\nFAILED tests/test_example.py::test_example - Exception: boom\n";
270
271 let compressed = PytestCompressor.compress("pytest", output);
272
273 assert!(compressed
274 .text
275 .contains("======= custom traceback divider ======="));
276 assert!(compressed.text.contains("Exception: boom"));
277 }
278}