lean_ctx/core/patterns/
pytest.rs1pub fn compress(command: &str, output: &str) -> Option<String> {
9 let is_pytest_cmd = command.contains("pytest") || command.contains("py.test");
11 let has_verbose_markers =
12 (output.contains("::") && output.contains(" PASSED")) || output.contains(" FAILED");
13 let has_session = output.contains("test session starts");
14
15 if !is_pytest_cmd && !has_verbose_markers && !has_session {
16 return None;
17 }
18
19 let mut passed: Vec<String> = Vec::new();
20 let mut failed: Vec<String> = Vec::new();
21 let mut skipped = 0u32;
22 let mut errors = 0u32;
23 let mut xfailed = 0u32;
24 let mut xpassed = 0u32;
25 let mut warnings = 0u32;
26 let mut duration = String::new();
27 let mut failure_details: Vec<String> = Vec::new();
28 let mut in_failure_block = false;
29 let mut current_failure: Vec<String> = Vec::new();
30
31 for line in output.lines() {
32 let trimmed = line.trim();
33
34 if trimmed.is_empty() {
36 if in_failure_block && !current_failure.is_empty() {
37 current_failure.push(String::new());
38 }
39 continue;
40 }
41
42 if trimmed.starts_with("SETUP")
44 || trimmed.starts_with("TEARDOWN")
45 || trimmed.contains("--- fixtures ---")
46 || trimmed.starts_with("---------- fixtures")
47 {
48 continue;
49 }
50
51 if trimmed.starts_with("collecting ")
53 || trimmed.starts_with("collected ")
54 || trimmed.starts_with("<Module ")
55 || trimmed.starts_with("<Class ")
56 || trimmed.starts_with("<Function ")
57 || trimmed.starts_with("platform ")
58 || trimmed.starts_with("rootdir:")
59 || trimmed.starts_with("configfile:")
60 || trimmed.starts_with("plugins:")
61 || trimmed.starts_with("cachedir:")
62 {
63 continue;
64 }
65
66 if trimmed.contains("test session starts")
68 || (trimmed.starts_with('=')
69 && trimmed.ends_with('=')
70 && trimmed.len() > 3
71 && !trimmed.contains("passed")
72 && !trimmed.contains("failed")
73 && !trimmed.contains("error"))
74 {
75 continue;
76 }
77
78 if trimmed.contains("::") {
81 match extract_status(trimmed) {
82 Some("PASSED") => {
83 let name = extract_test_name(trimmed);
84 passed.push(name);
85 in_failure_block = false;
86 continue;
87 }
88 Some("FAILED") => {
89 let name = extract_test_name(trimmed);
90 failed.push(name);
91 in_failure_block = false;
92 continue;
93 }
94 Some("SKIPPED") => {
95 skipped += 1;
96 in_failure_block = false;
97 continue;
98 }
99 Some("XFAIL") => {
100 xfailed += 1;
101 in_failure_block = false;
102 continue;
103 }
104 Some("XPASS") => {
105 xpassed += 1;
106 in_failure_block = false;
107 continue;
108 }
109 Some("ERROR") => {
110 errors += 1;
111 in_failure_block = false;
112 continue;
113 }
114 _ => {}
115 }
116 }
117
118 if (trimmed.starts_with("___") && trimmed.ends_with("___"))
120 || trimmed.starts_with("FAILED ")
121 {
122 if !current_failure.is_empty() {
124 let detail = current_failure.join("\n");
125 if !detail.trim().is_empty() {
126 failure_details.push(detail);
127 }
128 current_failure.clear();
129 }
130 in_failure_block = true;
131 continue;
132 }
133
134 if in_failure_block {
136 if current_failure.len() < 5 {
137 current_failure.push(trimmed.to_string());
138 }
139 continue;
140 }
141
142 if (trimmed.starts_with('=') || trimmed.starts_with('-'))
144 && (trimmed.contains("passed")
145 || trimmed.contains("failed")
146 || trimmed.contains("error"))
147 {
148 if let Some(d) = extract_duration(trimmed) {
149 duration = d;
150 }
151 if let Some(n) = extract_counter(trimmed, " passed") {
153 if passed.is_empty() && n > 0 {
154 for _ in 0..n {
156 passed.push(String::new());
157 }
158 }
159 }
160 if let Some(n) = extract_counter(trimmed, " failed") {
161 if failed.is_empty() && n > 0 {
162 for _ in 0..n {
163 failed.push(String::new());
164 }
165 }
166 }
167 if let Some(n) = extract_counter(trimmed, " skipped") {
168 if skipped == 0 {
169 skipped = n;
170 }
171 }
172 if let Some(n) = extract_counter(trimmed, " xfailed") {
173 if xfailed == 0 {
174 xfailed = n;
175 }
176 }
177 if let Some(n) = extract_counter(trimmed, " xpassed") {
178 if xpassed == 0 {
179 xpassed = n;
180 }
181 }
182 if let Some(n) = extract_counter(trimmed, " warning") {
183 warnings = n;
184 }
185 if let Some(n) = extract_counter(trimmed, " error") {
186 if errors == 0 {
187 errors = n;
188 }
189 }
190 }
191 }
192
193 if !current_failure.is_empty() {
195 let detail = current_failure.join("\n");
196 if !detail.trim().is_empty() {
197 failure_details.push(detail);
198 }
199 }
200
201 let passed_count = passed.len() as u32;
202 let failed_count = failed.len() as u32;
203
204 if passed_count == 0 && failed_count == 0 && errors == 0 {
205 return None;
206 }
207
208 let mut result = String::from("pytest: ");
210
211 if failed_count == 0 && errors == 0 {
212 result.push_str(&format!("✓ {passed_count} passed"));
213 } else {
214 result.push_str(&format!("{passed_count} passed, {failed_count} failed"));
215 }
216
217 if skipped > 0 {
218 result.push_str(&format!(", {skipped} skipped"));
219 }
220 if xfailed > 0 {
221 result.push_str(&format!(", {xfailed} xfailed"));
222 }
223 if xpassed > 0 {
224 result.push_str(&format!(", {xpassed} xpassed"));
225 }
226 if errors > 0 {
227 result.push_str(&format!(", {errors} errors"));
228 }
229 if warnings > 0 {
230 result.push_str(&format!(", {warnings} warnings"));
231 }
232
233 if !duration.is_empty() {
234 result.push_str(&format!(" in {duration}"));
235 }
236
237 let named_passed: Vec<&String> = passed.iter().filter(|s| !s.is_empty()).collect();
239 if !named_passed.is_empty() && named_passed.len() <= 10 {
240 let names: Vec<&str> = named_passed.iter().map(|s| s.as_str()).collect();
241 result.push_str(&format!("\n ran: {}", names.join(", ")));
242 }
243
244 let named_failures: Vec<&String> = failed.iter().filter(|s| !s.is_empty()).collect();
246 if !named_failures.is_empty() {
247 for f in named_failures.iter().take(5) {
248 result.push_str(&format!("\n FAIL: {f}"));
249 }
250 if named_failures.len() > 5 {
251 result.push_str(&format!("\n ...+{} more", named_failures.len() - 5));
252 }
253 }
254
255 if !failure_details.is_empty() {
257 for detail in failure_details.iter().take(3) {
258 let short: String = detail.lines().take(3).collect::<Vec<_>>().join("\n");
259 result.push_str(&format!("\n > {short}"));
260 }
261 }
262
263 Some(result)
264}
265
266fn extract_status(line: &str) -> Option<&'static str> {
270 const STATUSES: &[&str] = &["PASSED", "FAILED", "SKIPPED", "XFAIL", "XPASS", "ERROR"];
271 let stripped = if let Some(bracket_pos) = line.rfind('[') {
273 if line[bracket_pos..].contains('%') {
274 line[..bracket_pos].trim()
275 } else {
276 line.trim()
277 }
278 } else {
279 line.trim()
280 };
281
282 STATUSES.iter().find(|&&s| stripped.ends_with(s)).copied()
283}
284
285fn extract_test_name(line: &str) -> String {
289 let trimmed = line.trim();
290
291 let without_pct = if let Some(bracket_pos) = trimmed.rfind('[') {
293 if trimmed[bracket_pos..].contains('%') {
294 trimmed[..bracket_pos].trim()
295 } else {
296 trimmed
297 }
298 } else {
299 trimmed
300 };
301
302 let name_part = without_pct
304 .rsplit_once(' ')
305 .map_or(without_pct, |(name, _status)| name.trim());
306
307 if let Some(last_slash) = name_part.rfind('/') {
309 name_part[last_slash + 1..].to_string()
310 } else {
311 name_part.to_string()
312 }
313}
314
315fn extract_duration(line: &str) -> Option<String> {
316 if let Some(pos) = line.find(" in ") {
318 let after = &line[pos + 4..];
319 let dur: String = after
320 .chars()
321 .take_while(|c| c.is_ascii_digit() || *c == '.' || *c == 's' || *c == 'm')
322 .collect();
323 let dur = dur.trim_end_matches('=').trim().to_string();
324 if !dur.is_empty() {
325 return Some(dur);
326 }
327 }
328 None
329}
330
331fn extract_counter(line: &str, keyword: &str) -> Option<u32> {
332 let pos = line.find(keyword)?;
333 let before = &line[..pos];
334 let num_str = before.split_whitespace().last()?;
335 let clean: String = num_str.chars().filter(char::is_ascii_digit).collect();
336 clean.parse::<u32>().ok()
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn verbose_all_passed() {
345 let output = "\
346============================= test session starts ==============================
347platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.3.0
348rootdir: /home/user/project
349configfile: pyproject.toml
350plugins: cov-4.1.0
351collecting ... collected 3 items
352
353tests/test_math.py::test_add PASSED [ 33%]
354tests/test_math.py::test_subtract PASSED [ 66%]
355tests/test_math.py::test_multiply PASSED [100%]
356
357============================== 3 passed in 0.42s ===============================";
358
359 let result = compress("pytest -v", output).expect("should compress");
360 assert!(result.contains("✓ 3 passed"));
361 assert!(result.contains("0.42s"));
362 assert!(!result.contains("rootdir"));
363 assert!(!result.contains("collecting"));
364 assert!(!result.contains("platform"));
365 }
366
367 #[test]
368 fn verbose_mixed_results() {
369 let output = "\
370============================= test session starts ==============================
371platform linux -- Python 3.11.5, pytest-7.4.3
372collected 4 items
373
374tests/test_auth.py::test_login PASSED [ 25%]
375tests/test_auth.py::test_logout PASSED [ 50%]
376tests/test_auth.py::test_expired_token FAILED [ 75%]
377tests/test_auth.py::test_refresh SKIPPED [100%]
378
379=========================== short test summary info ============================
380FAILED tests/test_auth.py::test_expired_token
381============================== 1 failed, 2 passed, 1 skipped in 1.23s ===============================";
382
383 let result = compress("pytest -v", output).expect("should compress");
384 assert!(result.contains("2 passed"));
385 assert!(result.contains("1 failed"));
386 assert!(result.contains("1 skipped"));
387 assert!(result.contains("FAIL:"));
388 assert!(result.contains("test_expired_token"));
389 }
390
391 #[test]
392 fn strips_fixture_lines() {
393 let output = "\
394============================= test session starts ==============================
395collected 2 items
396
397SETUP S session_fixture
398tests/test_db.py::test_insert PASSED [ 50%]
399TEARDOWN S session_fixture
400tests/test_db.py::test_query PASSED [100%]
401
402============================== 2 passed in 0.31s ===============================";
403
404 let result = compress("pytest -v --setup-show", output).expect("should compress");
405 assert!(result.contains("✓ 2 passed"));
406 assert!(!result.contains("SETUP"));
407 assert!(!result.contains("TEARDOWN"));
408 }
409
410 #[test]
411 fn strips_collection_lines() {
412 let output = "\
413============================= test session starts ==============================
414platform linux -- Python 3.11.5
415collecting ... collected 5 items
416<Module tests/test_api.py>
417 <Class TestUsers>
418 <Function test_list>
419 <Function test_create>
420
421tests/test_api.py::TestUsers::test_list PASSED [ 20%]
422tests/test_api.py::TestUsers::test_create PASSED [ 40%]
423tests/test_api.py::TestUsers::test_delete PASSED [ 60%]
424tests/test_api.py::TestUsers::test_update PASSED [ 80%]
425tests/test_api.py::TestUsers::test_get PASSED [100%]
426
427============================== 5 passed in 2.10s ===============================";
428
429 let result = compress("pytest -v --collect-only", output).expect("should compress");
430 assert!(result.contains("✓ 5 passed"));
431 assert!(!result.contains("<Module"));
432 assert!(!result.contains("<Class"));
433 assert!(!result.contains("<Function"));
434 assert!(!result.contains("collecting"));
435 }
436
437 #[test]
438 fn non_pytest_returns_none() {
439 let output = "Hello world\nThis is not pytest output\n";
440 assert!(compress("echo hello", output).is_none());
441 }
442
443 #[test]
444 fn failure_with_traceback() {
445 let output = "\
446============================= test session starts ==============================
447collected 2 items
448
449tests/test_calc.py::test_divide PASSED [ 50%]
450tests/test_calc.py::test_divide_zero FAILED [100%]
451
452=================================== FAILURES ===================================
453___________________________ test_divide_zero ___________________________________
454
455 def test_divide_zero():
456> assert divide(1, 0) == 0
457E ZeroDivisionError: division by zero
458
459src/calc.py:10: ZeroDivisionError
460=========================== short test summary info ============================
461FAILED tests/test_calc.py::test_divide_zero
462============================== 1 failed, 1 passed in 0.15s ===============================";
463
464 let result = compress("pytest -v --tb=short", output).expect("should compress");
465 assert!(result.contains("1 passed"));
466 assert!(result.contains("1 failed"));
467 assert!(result.contains("FAIL:"));
468 assert!(result.contains("test_divide_zero"));
469 }
470}