Skip to main content

lean_ctx/core/patterns/
pytest.rs

1/// Dedicated compression pattern for verbose pytest output (`pytest -v`, `pytest --tb=short`).
2///
3/// Handles:
4/// - Per-test PASSED/FAILED lines with full module paths → consolidated summary
5/// - Fixture setup/teardown lines → stripped
6/// - Collection lines (`collecting...`, `collected N items`) → stripped
7/// - Short tracebacks for failures → kept but trimmed
8pub fn compress(command: &str, output: &str) -> Option<String> {
9    // Only activate for pytest commands or output that looks like verbose pytest
10    let is_pytest_cmd = command.contains("pytest") || command.contains("py.test");
11    let has_verbose_markers =
12        (output.contains("::") && output.contains(" PASSED")) || output.contains(" FAILED");
13    let has_session = output.contains("test session starts");
14
15    if !is_pytest_cmd && !has_verbose_markers && !has_session {
16        return None;
17    }
18
19    let mut passed: Vec<String> = Vec::new();
20    let mut failed: Vec<String> = Vec::new();
21    let mut skipped = 0u32;
22    let mut errors = 0u32;
23    let mut xfailed = 0u32;
24    let mut xpassed = 0u32;
25    let mut warnings = 0u32;
26    let mut duration = String::new();
27    let mut failure_details: Vec<String> = Vec::new();
28    let mut in_failure_block = false;
29    let mut current_failure: Vec<String> = Vec::new();
30
31    for line in output.lines() {
32        let trimmed = line.trim();
33
34        // Skip empty lines
35        if trimmed.is_empty() {
36            if in_failure_block && !current_failure.is_empty() {
37                current_failure.push(String::new());
38            }
39            continue;
40        }
41
42        // Skip fixture setup/teardown lines
43        if trimmed.starts_with("SETUP")
44            || trimmed.starts_with("TEARDOWN")
45            || trimmed.contains("--- fixtures ---")
46            || trimmed.starts_with("---------- fixtures")
47        {
48            continue;
49        }
50
51        // Skip collection lines
52        if trimmed.starts_with("collecting ")
53            || trimmed.starts_with("collected ")
54            || trimmed.starts_with("<Module ")
55            || trimmed.starts_with("<Class ")
56            || trimmed.starts_with("<Function ")
57            || trimmed.starts_with("platform ")
58            || trimmed.starts_with("rootdir:")
59            || trimmed.starts_with("configfile:")
60            || trimmed.starts_with("plugins:")
61            || trimmed.starts_with("cachedir:")
62        {
63            continue;
64        }
65
66        // Skip session header
67        if trimmed.contains("test session starts")
68            || (trimmed.starts_with('=')
69                && trimmed.ends_with('=')
70                && trimmed.len() > 3
71                && !trimmed.contains("passed")
72                && !trimmed.contains("failed")
73                && !trimmed.contains("error"))
74        {
75            continue;
76        }
77
78        // Detect verbose per-test result lines: `path/test_file.py::test_name PASSED [ 75%]`
79        // The status may be followed by whitespace and a percentage indicator.
80        if trimmed.contains("::") {
81            match extract_status(trimmed) {
82                Some("PASSED") => {
83                    let name = extract_test_name(trimmed);
84                    passed.push(name);
85                    in_failure_block = false;
86                    continue;
87                }
88                Some("FAILED") => {
89                    let name = extract_test_name(trimmed);
90                    failed.push(name);
91                    in_failure_block = false;
92                    continue;
93                }
94                Some("SKIPPED") => {
95                    skipped += 1;
96                    in_failure_block = false;
97                    continue;
98                }
99                Some("XFAIL") => {
100                    xfailed += 1;
101                    in_failure_block = false;
102                    continue;
103                }
104                Some("XPASS") => {
105                    xpassed += 1;
106                    in_failure_block = false;
107                    continue;
108                }
109                Some("ERROR") => {
110                    errors += 1;
111                    in_failure_block = false;
112                    continue;
113                }
114                _ => {}
115            }
116        }
117
118        // Detect failure section header: `___ test_name ___` or `FAILED test_name`
119        if (trimmed.starts_with("___") && trimmed.ends_with("___"))
120            || trimmed.starts_with("FAILED ")
121        {
122            // Save previous failure block
123            if !current_failure.is_empty() {
124                let detail = current_failure.join("\n");
125                if !detail.trim().is_empty() {
126                    failure_details.push(detail);
127                }
128                current_failure.clear();
129            }
130            in_failure_block = true;
131            continue;
132        }
133
134        // Capture failure traceback lines (keep short, max 5 lines per failure)
135        if in_failure_block {
136            if current_failure.len() < 5 {
137                current_failure.push(trimmed.to_string());
138            }
139            continue;
140        }
141
142        // Parse summary line: `=== 42 passed, 1 failed in 3.21s ===`
143        if (trimmed.starts_with('=') || trimmed.starts_with('-'))
144            && (trimmed.contains("passed")
145                || trimmed.contains("failed")
146                || trimmed.contains("error"))
147        {
148            if let Some(d) = extract_duration(trimmed) {
149                duration = d;
150            }
151            // Also extract counters from summary as fallback
152            if let Some(n) = extract_counter(trimmed, " passed") {
153                if passed.is_empty() && n > 0 {
154                    // Use counter from summary if we didn't see individual lines
155                    for _ in 0..n {
156                        passed.push(String::new());
157                    }
158                }
159            }
160            if let Some(n) = extract_counter(trimmed, " failed") {
161                if failed.is_empty() && n > 0 {
162                    for _ in 0..n {
163                        failed.push(String::new());
164                    }
165                }
166            }
167            if let Some(n) = extract_counter(trimmed, " skipped") {
168                if skipped == 0 {
169                    skipped = n;
170                }
171            }
172            if let Some(n) = extract_counter(trimmed, " xfailed") {
173                if xfailed == 0 {
174                    xfailed = n;
175                }
176            }
177            if let Some(n) = extract_counter(trimmed, " xpassed") {
178                if xpassed == 0 {
179                    xpassed = n;
180                }
181            }
182            if let Some(n) = extract_counter(trimmed, " warning") {
183                warnings = n;
184            }
185            if let Some(n) = extract_counter(trimmed, " error") {
186                if errors == 0 {
187                    errors = n;
188                }
189            }
190        }
191    }
192
193    // Save last failure block
194    if !current_failure.is_empty() {
195        let detail = current_failure.join("\n");
196        if !detail.trim().is_empty() {
197            failure_details.push(detail);
198        }
199    }
200
201    let passed_count = passed.len() as u32;
202    let failed_count = failed.len() as u32;
203
204    if passed_count == 0 && failed_count == 0 && errors == 0 {
205        return None;
206    }
207
208    // Build compressed output
209    let mut result = String::from("pytest: ");
210
211    if failed_count == 0 && errors == 0 {
212        result.push_str(&format!("✓ {passed_count} passed"));
213    } else {
214        result.push_str(&format!("{passed_count} passed, {failed_count} failed"));
215    }
216
217    if skipped > 0 {
218        result.push_str(&format!(", {skipped} skipped"));
219    }
220    if xfailed > 0 {
221        result.push_str(&format!(", {xfailed} xfailed"));
222    }
223    if xpassed > 0 {
224        result.push_str(&format!(", {xpassed} xpassed"));
225    }
226    if errors > 0 {
227        result.push_str(&format!(", {errors} errors"));
228    }
229    if warnings > 0 {
230        result.push_str(&format!(", {warnings} warnings"));
231    }
232
233    if !duration.is_empty() {
234        result.push_str(&format!(" in {duration}"));
235    }
236
237    // Show passed test names when count is small (preserves identifiers for debugging)
238    let named_passed: Vec<&String> = passed.iter().filter(|s| !s.is_empty()).collect();
239    if !named_passed.is_empty() && named_passed.len() <= 10 {
240        let names: Vec<&str> = named_passed.iter().map(|s| s.as_str()).collect();
241        result.push_str(&format!("\n  ran: {}", names.join(", ")));
242    }
243
244    // Show failed test names (up to 5)
245    let named_failures: Vec<&String> = failed.iter().filter(|s| !s.is_empty()).collect();
246    if !named_failures.is_empty() {
247        for f in named_failures.iter().take(5) {
248            result.push_str(&format!("\n  FAIL: {f}"));
249        }
250        if named_failures.len() > 5 {
251            result.push_str(&format!("\n  ...+{} more", named_failures.len() - 5));
252        }
253    }
254
255    // Show failure details (up to 3 blocks, trimmed)
256    if !failure_details.is_empty() {
257        for detail in failure_details.iter().take(3) {
258            let short: String = detail.lines().take(3).collect::<Vec<_>>().join("\n");
259            result.push_str(&format!("\n  > {short}"));
260        }
261    }
262
263    Some(result)
264}
265
266/// Extracts the test status from a verbose pytest line.
267/// Handles lines like: `tests/test_auth.py::test_name PASSED                  [ 75%]`
268/// Returns the status keyword if found.
269fn extract_status(line: &str) -> Option<&'static str> {
270    const STATUSES: &[&str] = &["PASSED", "FAILED", "SKIPPED", "XFAIL", "XPASS", "ERROR"];
271    // Strip trailing percentage indicator and whitespace
272    let stripped = if let Some(bracket_pos) = line.rfind('[') {
273        if line[bracket_pos..].contains('%') {
274            line[..bracket_pos].trim()
275        } else {
276            line.trim()
277        }
278    } else {
279        line.trim()
280    };
281
282    STATUSES.iter().find(|&&s| stripped.ends_with(s)).copied()
283}
284
285/// Extracts the short test name from a verbose pytest line.
286/// Input: `tests/test_auth.py::TestLogin::test_expired_token PASSED                  [ 75%]`
287/// Output: `test_auth.py::test_expired_token`
288fn extract_test_name(line: &str) -> String {
289    let trimmed = line.trim();
290
291    // Strip trailing percentage indicator `[ 75%]`
292    let without_pct = if let Some(bracket_pos) = trimmed.rfind('[') {
293        if trimmed[bracket_pos..].contains('%') {
294            trimmed[..bracket_pos].trim()
295        } else {
296            trimmed
297        }
298    } else {
299        trimmed
300    };
301
302    // Remove the status suffix (PASSED, FAILED, etc.)
303    let name_part = without_pct
304        .rsplit_once(' ')
305        .map_or(without_pct, |(name, _status)| name.trim());
306
307    // Shorten: keep filename::test_name, drop intermediate path
308    if let Some(last_slash) = name_part.rfind('/') {
309        name_part[last_slash + 1..].to_string()
310    } else {
311        name_part.to_string()
312    }
313}
314
315fn extract_duration(line: &str) -> Option<String> {
316    // Look for "in X.XXs" pattern
317    if let Some(pos) = line.find(" in ") {
318        let after = &line[pos + 4..];
319        let dur: String = after
320            .chars()
321            .take_while(|c| c.is_ascii_digit() || *c == '.' || *c == 's' || *c == 'm')
322            .collect();
323        let dur = dur.trim_end_matches('=').trim().to_string();
324        if !dur.is_empty() {
325            return Some(dur);
326        }
327    }
328    None
329}
330
331fn extract_counter(line: &str, keyword: &str) -> Option<u32> {
332    let pos = line.find(keyword)?;
333    let before = &line[..pos];
334    let num_str = before.split_whitespace().last()?;
335    let clean: String = num_str.chars().filter(char::is_ascii_digit).collect();
336    clean.parse::<u32>().ok()
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn verbose_all_passed() {
345        let output = "\
346============================= test session starts ==============================
347platform linux -- Python 3.11.5, pytest-7.4.3, pluggy-1.3.0
348rootdir: /home/user/project
349configfile: pyproject.toml
350plugins: cov-4.1.0
351collecting ... collected 3 items
352
353tests/test_math.py::test_add PASSED                                      [ 33%]
354tests/test_math.py::test_subtract PASSED                                 [ 66%]
355tests/test_math.py::test_multiply PASSED                                 [100%]
356
357============================== 3 passed in 0.42s ===============================";
358
359        let result = compress("pytest -v", output).expect("should compress");
360        assert!(result.contains("✓ 3 passed"));
361        assert!(result.contains("0.42s"));
362        assert!(!result.contains("rootdir"));
363        assert!(!result.contains("collecting"));
364        assert!(!result.contains("platform"));
365    }
366
367    #[test]
368    fn verbose_mixed_results() {
369        let output = "\
370============================= test session starts ==============================
371platform linux -- Python 3.11.5, pytest-7.4.3
372collected 4 items
373
374tests/test_auth.py::test_login PASSED                                    [ 25%]
375tests/test_auth.py::test_logout PASSED                                   [ 50%]
376tests/test_auth.py::test_expired_token FAILED                            [ 75%]
377tests/test_auth.py::test_refresh SKIPPED                                 [100%]
378
379=========================== short test summary info ============================
380FAILED tests/test_auth.py::test_expired_token
381============================== 1 failed, 2 passed, 1 skipped in 1.23s ===============================";
382
383        let result = compress("pytest -v", output).expect("should compress");
384        assert!(result.contains("2 passed"));
385        assert!(result.contains("1 failed"));
386        assert!(result.contains("1 skipped"));
387        assert!(result.contains("FAIL:"));
388        assert!(result.contains("test_expired_token"));
389    }
390
391    #[test]
392    fn strips_fixture_lines() {
393        let output = "\
394============================= test session starts ==============================
395collected 2 items
396
397SETUP    S session_fixture
398tests/test_db.py::test_insert PASSED                                     [ 50%]
399TEARDOWN S session_fixture
400tests/test_db.py::test_query PASSED                                      [100%]
401
402============================== 2 passed in 0.31s ===============================";
403
404        let result = compress("pytest -v --setup-show", output).expect("should compress");
405        assert!(result.contains("✓ 2 passed"));
406        assert!(!result.contains("SETUP"));
407        assert!(!result.contains("TEARDOWN"));
408    }
409
410    #[test]
411    fn strips_collection_lines() {
412        let output = "\
413============================= test session starts ==============================
414platform linux -- Python 3.11.5
415collecting ... collected 5 items
416<Module tests/test_api.py>
417  <Class TestUsers>
418    <Function test_list>
419    <Function test_create>
420
421tests/test_api.py::TestUsers::test_list PASSED                           [ 20%]
422tests/test_api.py::TestUsers::test_create PASSED                         [ 40%]
423tests/test_api.py::TestUsers::test_delete PASSED                         [ 60%]
424tests/test_api.py::TestUsers::test_update PASSED                         [ 80%]
425tests/test_api.py::TestUsers::test_get PASSED                            [100%]
426
427============================== 5 passed in 2.10s ===============================";
428
429        let result = compress("pytest -v --collect-only", output).expect("should compress");
430        assert!(result.contains("✓ 5 passed"));
431        assert!(!result.contains("<Module"));
432        assert!(!result.contains("<Class"));
433        assert!(!result.contains("<Function"));
434        assert!(!result.contains("collecting"));
435    }
436
437    #[test]
438    fn non_pytest_returns_none() {
439        let output = "Hello world\nThis is not pytest output\n";
440        assert!(compress("echo hello", output).is_none());
441    }
442
443    #[test]
444    fn failure_with_traceback() {
445        let output = "\
446============================= test session starts ==============================
447collected 2 items
448
449tests/test_calc.py::test_divide PASSED                                   [ 50%]
450tests/test_calc.py::test_divide_zero FAILED                              [100%]
451
452=================================== FAILURES ===================================
453___________________________ test_divide_zero ___________________________________
454
455    def test_divide_zero():
456>       assert divide(1, 0) == 0
457E       ZeroDivisionError: division by zero
458
459src/calc.py:10: ZeroDivisionError
460=========================== short test summary info ============================
461FAILED tests/test_calc.py::test_divide_zero
462============================== 1 failed, 1 passed in 0.15s ===============================";
463
464        let result = compress("pytest -v --tb=short", output).expect("should compress");
465        assert!(result.contains("1 passed"));
466        assert!(result.contains("1 failed"));
467        assert!(result.contains("FAIL:"));
468        assert!(result.contains("test_divide_zero"));
469    }
470}