use super::*;
use std::path::{Path, PathBuf};
fn write_file(path: &Path, contents: &str) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|err| format!("create_dir_all({}): {err}", parent.display()))?;
}
std::fs::write(path, contents).map_err(|err| format!("write({}): {err}", path.display()))?;
Ok(())
}
fn unique_tempdir(label: &str) -> Result<PathBuf, String> {
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|err| format!("system time: {err}"))?
.as_nanos();
let dir = std::env::temp_dir().join(format!(
"ripr-python-coverage-{label}-{}-{nanos}",
std::process::id()
));
std::fs::create_dir_all(&dir)
.map_err(|err| format!("create_dir_all({}): {err}", dir.display()))?;
Ok(dir)
}
fn assertion_oracles(source: &str) -> Vec<(OracleKind, OracleStrength)> {
let tests = extract_tests(Path::new("tests/test_inline.py"), source);
let mut out = Vec::new();
for test in tests {
for assertion in test.assertions {
out.push((assertion.oracle_kind, assertion.oracle_strength));
}
}
out
}
#[test]
fn collect_assertions_walks_control_flow_bodies() -> Result<(), String> {
let source = r#"
def test_walks_control_flow():
if value:
assert value == 1
else:
assert value == 2
for item in items:
assert item == 0
else:
assert items == []
while count:
assert count == 5
else:
assert count == 0
with open("p") as f:
assert f.read() == "ok"
try:
assert raw == 7
except ValueError:
assert handled == 8
else:
assert orelse == 9
finally:
assert finalbody == 10
match value:
case 1:
assert value == 11
case _:
assert value == 12
"#;
let tests = extract_tests(Path::new("tests/test_walks.py"), source);
if tests.len() != 1 {
return Err(format!("expected single test, got {}", tests.len()));
}
let exact = tests[0]
.assertions
.iter()
.filter(|assertion| assertion.oracle_kind == OracleKind::ExactValue)
.count();
if exact < 12 {
return Err(format!(
"expected at least 12 exact-value assertions across nested control flow, got {exact}"
));
}
Ok(())
}
#[test]
fn collect_assertions_walks_try_star_and_async_for_and_async_with() -> Result<(), String> {
let source = r#"
async def test_walks_async_and_try_star():
async for chunk in stream:
assert chunk == "ok"
async with lock:
assert held == 1
try:
await do()
except* RuntimeError:
assert grouped == 2
"#;
let tests = extract_tests(Path::new("tests/test_async.py"), source);
if tests.len() != 1 {
return Err(format!("expected single async test, got {}", tests.len()));
}
let kinds: Vec<&OracleKind> = tests[0]
.assertions
.iter()
.map(|assertion| &assertion.oracle_kind)
.collect();
if kinds.len() < 3 {
return Err(format!("expected three nested asserts, got {:?}", kinds));
}
if !kinds.iter().all(|kind| **kind == OracleKind::ExactValue) {
return Err(format!(
"expected all nested asserts to be exact-value, got {:?}",
kinds
));
}
Ok(())
}
#[test]
fn collect_with_item_assertions_extracts_pytest_raises_in_context() -> Result<(), String> {
let source = r#"
import pytest
def test_with_item_assertion():
with pytest.raises(ValueError):
do_thing()
"#;
let oracles = assertion_oracles(source);
if !oracles.iter().any(|(kind, strength)| {
matches!(kind, OracleKind::BroadError) && *strength == OracleStrength::Weak
}) {
return Err(format!(
"expected pytest.raises(...) context manager to register BroadError oracle, got {:?}",
oracles
));
}
Ok(())
}
#[test]
fn oracle_for_call_recognizes_all_unittest_and_mock_variants() -> Result<(), String> {
let source = r#"
import unittest
class CaseAll(unittest.TestCase):
def test_variants(self):
self.assertEqual(actual, 1)
self.assertNotEqual(actual, 2)
self.assertTrue(actual)
self.assertFalse(actual)
with self.assertRaises(ValueError):
do_one()
with self.assertRaisesRegex(ValueError, "bad"):
do_two()
mock.assert_called()
mock.assert_called_once()
mock.assert_called_with(1)
mock.assert_called_once_with(1)
mock.assert_any_call(1)
mock.assert_has_calls([call(1)])
mock.assert_not_called()
unknown_call(actual)
"#;
let oracles = assertion_oracles(source);
let strong = oracles
.iter()
.filter(|(kind, _)| matches!(kind, OracleKind::ExactValue))
.count();
let relational = oracles
.iter()
.filter(|(kind, _)| matches!(kind, OracleKind::RelationalCheck))
.count();
let smoke = oracles
.iter()
.filter(|(kind, _)| matches!(kind, OracleKind::SmokeOnly))
.count();
let broad_error = oracles
.iter()
.filter(|(kind, _)| matches!(kind, OracleKind::BroadError))
.count();
let mock_expectations = oracles
.iter()
.filter(|(kind, _)| matches!(kind, OracleKind::MockExpectation))
.count();
if strong < 1 {
return Err(format!(
"expected at least one ExactValue oracle (assertEqual), got {:?}",
oracles
));
}
if relational < 1 {
return Err(format!(
"expected at least one RelationalCheck oracle (assertNotEqual), got {:?}",
oracles
));
}
if smoke < 2 {
return Err(format!(
"expected assertTrue + assertFalse smoke-only oracles, got {:?}",
oracles
));
}
if broad_error < 2 {
return Err(format!(
"expected assertRaises + assertRaisesRegex broad-error oracles, got {:?}",
oracles
));
}
if mock_expectations < 7 {
return Err(format!(
"expected seven mock expectation oracles, got {} (oracles: {:?})",
mock_expectations, oracles
));
}
Ok(())
}
#[test]
fn oracle_for_assert_expr_falls_back_to_smoke_for_bare_name() -> Result<(), String> {
let source = r#"
def test_bare_truthy():
assert flag
"#;
let oracles = assertion_oracles(source);
let bare = oracles
.first()
.ok_or_else(|| "expected one assertion".to_string())?;
if bare.0 != OracleKind::SmokeOnly || bare.1 != OracleStrength::Smoke {
return Err(format!(
"bare-name assertion should be SmokeOnly/Smoke, got {:?}",
bare
));
}
Ok(())
}
#[test]
fn classify_probe_shape_covers_all_python_branches() {
let predicate_cases = [
" elif amount > 0:",
" while remaining:",
" for entry in items:",
" match command:",
" case Cmd.Pay():",
];
for line in predicate_cases {
let (family, delta) = classify_probe_shape(line);
assert_eq!(family, ProbeFamily::Predicate, "predicate for `{line}`");
assert_eq!(delta, DeltaKind::Control, "predicate delta for `{line}`");
}
let error_cases = [
" raise",
" try:",
" except* RuntimeError:",
" finally:",
" with pytest.raises(ValueError):",
];
for line in error_cases {
let (family, delta) = classify_probe_shape(line);
assert_eq!(family, ProbeFamily::ErrorPath, "error path for `{line}`");
assert_eq!(delta, DeltaKind::Control, "error delta for `{line}`");
}
let (family, delta) = classify_probe_shape(" return");
assert_eq!(family, ProbeFamily::ReturnValue);
assert_eq!(delta, DeltaKind::Value);
let (family, delta) = classify_probe_shape(" handle = service.handler()");
assert_eq!(family, ProbeFamily::SideEffect);
assert_eq!(delta, DeltaKind::Effect);
let (family, delta) = classify_probe_shape(" await pump.push(event)");
assert_eq!(family, ProbeFamily::SideEffect);
assert_eq!(delta, DeltaKind::Effect);
let (family, delta) = classify_probe_shape(" \"docstring\"");
assert_eq!(family, ProbeFamily::Predicate);
assert_eq!(delta, DeltaKind::Control);
let (family, delta) = classify_probe_shape(" callback = Mock(name=\"sent\")");
assert_eq!(family, ProbeFamily::SideEffect);
assert_eq!(delta, DeltaKind::Effect);
}
#[test]
fn body_calls_owner_filters_comments_and_string_mentions() {
let owner = PythonOwner {
name: "apply_discount".to_string(),
qualified_name: "apply_discount".to_string(),
file: PathBuf::from("src/pricing.py"),
start_line: 1,
end_line: 5,
owner_kind: OwnerKind::Function,
decorators: Vec::new(),
imports: Vec::new(),
};
let comment_only = " # apply_discount(100)\n other()\n";
assert!(
!body_calls_owner(comment_only, &owner),
"matches on commented-out call sites should be filtered"
);
let inside_string = " note = \"call apply_discount(\"\n other()\n";
assert!(
!body_calls_owner(inside_string, &owner),
"matches inside an open string should be filtered"
);
let identifier_prefix = " not_apply_discount(1)\n";
assert!(
!body_calls_owner(identifier_prefix, &owner),
"identifier-prefixed names should not match call boundary"
);
let real = " result = apply_discount(100)\n";
assert!(
body_calls_owner(real, &owner),
"real top-level call should match"
);
}
#[test]
fn has_unclosed_quote_handles_escapes_and_nested_quotes() {
assert!(!has_unclosed_quote("\"closed\""));
assert!(!has_unclosed_quote("'closed'"));
assert!(has_unclosed_quote("\"open"));
assert!(has_unclosed_quote("'open"));
assert!(!has_unclosed_quote("\"escape \\\" here\""));
assert!(has_unclosed_quote("'still \"open"));
assert!(!has_unclosed_quote("\\"));
}
#[test]
fn contains_any_attribute_call_matches_arbitrary_receiver() {
assert!(contains_any_attribute_call(
" order.apply_discount(100)\n",
"apply_discount"
));
assert!(!contains_any_attribute_call(
" # order.apply_discount(100)\n",
"apply_discount"
));
}
#[test]
fn is_test_file_recognizes_file_and_directory_conventions() {
assert!(is_test_file(Path::new("tests/foo.py")));
assert!(is_test_file(Path::new("test/foo.py")));
assert!(is_test_file(Path::new("src/test_thing.py")));
assert!(is_test_file(Path::new("src/thing_test.py")));
assert!(is_test_file(Path::new("nested/tests/sub/utility.py")));
assert!(!is_test_file(Path::new("src/pricing.py")));
assert!(!is_test_file(Path::new("src/testing.py")));
}
#[test]
fn is_unittest_class_accepts_bare_and_dotted_test_case_bases() -> Result<(), String> {
let bare = extract_tests(
Path::new("tests/test_bare.py"),
"import unittest\nclass A(TestCase):\n def test_a(self):\n pass\n",
);
let dotted = extract_tests(
Path::new("tests/test_dotted.py"),
"import unittest\nclass B(unittest.TestCase):\n def test_b(self):\n pass\n",
);
let neither = extract_tests(
Path::new("tests/test_neither.py"),
"class C(object):\n def test_c(self):\n pass\n",
);
if bare.first().map(|t| t.framework) != Some("unittest") {
return Err("bare `TestCase` base should mark unittest framework".to_string());
}
if dotted.first().map(|t| t.framework) != Some("unittest") {
return Err("`unittest.TestCase` base should mark unittest framework".to_string());
}
if neither.first().map(|t| t.framework) != Some("pytest") {
return Err("non-TestCase base should fall back to pytest framework".to_string());
}
Ok(())
}
#[test]
fn normalized_path_strips_dot_slash_prefix_and_normalizes_separators() {
assert_eq!(normalized_path(Path::new("./src/foo.py")), "src/foo.py");
assert_eq!(normalized_path(Path::new("src/foo.py")), "src/foo.py");
assert_eq!(
normalized_path(Path::new("crates/ripr/src/lib.py")),
"crates/ripr/src/lib.py"
);
}
#[test]
fn text_for_range_clamps_out_of_bounds_offsets() {
use rustpython_parser::text_size::{TextRange, TextSize};
let source = "abc";
let huge = TextRange::new(TextSize::from(10_u32), TextSize::from(99_u32));
assert_eq!(text_for_range(source, huge), "");
let partial = TextRange::new(TextSize::from(0_u32), TextSize::from(99_u32));
assert_eq!(text_for_range(source, partial), "abc");
}
#[test]
fn line_for_offset_counts_newlines() {
let source = "alpha\nbeta\ngamma";
assert_eq!(line_for_offset(source, 0), 1);
assert_eq!(line_for_offset(source, 7), 2);
assert_eq!(line_for_offset(source, 999), 3);
}
#[test]
fn looks_like_call_expression_handles_trailing_semicolons_and_whitespace() {
assert!(looks_like_call_expression("notify(event);"));
assert!(looks_like_call_expression("notify(event) "));
assert!(!looks_like_call_expression("notify"));
assert!(!looks_like_call_expression("notify("));
}
#[test]
fn contains_mock_initializer_recognizes_both_constructors() {
assert!(contains_mock_initializer("callback = Mock(name='x')"));
assert!(contains_mock_initializer("callback = MagicMock(name='y')"));
assert!(!contains_mock_initializer("notify(payload)"));
}
#[test]
fn is_known_mock_constructor_import_matches_imported_and_aliased() {
let imported = PythonImport {
imported: "Mock".to_string(),
alias: "Mock".to_string(),
};
let aliased = PythonImport {
imported: "MagicMock".to_string(),
alias: "MM".to_string(),
};
let alias_only = PythonImport {
imported: "Other".to_string(),
alias: "Mock".to_string(),
};
let unrelated = PythonImport {
imported: "json".to_string(),
alias: "json".to_string(),
};
assert!(is_known_mock_constructor_import(&imported));
assert!(is_known_mock_constructor_import(&aliased));
assert!(is_known_mock_constructor_import(&alias_only));
assert!(!is_known_mock_constructor_import(&unrelated));
}
#[test]
fn static_limit_detects_monkeypatch_setitem_and_delattr() -> Result<(), String> {
let owner = extract_owners(Path::new("src/service.py"), "def total():\n return 1\n")
.into_iter()
.next()
.ok_or_else(|| "missing owner".to_string())?;
let setitem_tests = extract_tests(
Path::new("tests/test_setitem.py"),
"from src.service import total\n\ndef test_total(monkeypatch):\n monkeypatch.setitem({}, \"key\", lambda: 1)\n assert total() == 1\n",
);
let delattr_tests = extract_tests(
Path::new("tests/test_delattr.py"),
"from src.service import total\n\ndef test_total(monkeypatch):\n monkeypatch.delattr(\"src.service.helper\")\n assert total() == 1\n",
);
let setitem_candidates = related_test_candidates(&owner, &setitem_tests);
let delattr_candidates = related_test_candidates(&owner, &delattr_tests);
let setitem = static_limit_for_change(" return total()", &owner, &setitem_candidates)
.ok_or_else(|| "expected MockedModule for monkeypatch.setitem".to_string())?;
let delattr = static_limit_for_change(" return total()", &owner, &delattr_candidates)
.ok_or_else(|| "expected MockedModule for monkeypatch.delattr".to_string())?;
if setitem.kind != StaticLimitKind::MockedModule {
return Err(format!(
"expected MockedModule for monkeypatch.setitem, got {:?}",
setitem.kind
));
}
if delattr.kind != StaticLimitKind::MockedModule {
return Err(format!(
"expected MockedModule for monkeypatch.delattr, got {:?}",
delattr.kind
));
}
Ok(())
}
#[test]
fn static_limit_returns_none_when_no_limits_apply() -> Result<(), String> {
let owner = extract_owners(Path::new("src/service.py"), "def total():\n return 1\n")
.into_iter()
.next()
.ok_or_else(|| "missing owner".to_string())?;
assert!(
static_limit_for_change(" return 1", &owner, &[]).is_none(),
"plain return without indirection should not raise a static_limit"
);
Ok(())
}
#[test]
fn static_limit_picks_first_non_transparent_decorator() -> Result<(), String> {
let owner = extract_owners(
Path::new("src/service.py"),
"class Service:\n @staticmethod\n @retry(times=3)\n def total():\n return 1\n",
)
.into_iter()
.next()
.ok_or_else(|| "missing owner".to_string())?;
let limit = static_limit_for_change(" return 1", &owner, &[])
.ok_or_else(|| "expected DecoratorIndirection limit".to_string())?;
if limit.kind != StaticLimitKind::DecoratorIndirection {
return Err(format!(
"expected DecoratorIndirection, got {:?}",
limit.kind
));
}
if !limit.evidence.contains("retry") {
return Err(format!(
"expected evidence to name the `retry` decorator, got {}",
limit.evidence
));
}
Ok(())
}
#[test]
fn imported_module_matches_owner_compares_last_segment_to_owner_stem() {
let owner = PythonOwner {
name: "apply_discount".to_string(),
qualified_name: "apply_discount".to_string(),
file: PathBuf::from("src/pricing.py"),
start_line: 1,
end_line: 1,
owner_kind: OwnerKind::Function,
decorators: Vec::new(),
imports: Vec::new(),
};
let dotted = PythonImport {
imported: "src.pricing".to_string(),
alias: "pricing".to_string(),
};
let plain = PythonImport {
imported: "pricing".to_string(),
alias: "pricing".to_string(),
};
let mismatched = PythonImport {
imported: "src.tax".to_string(),
alias: "tax".to_string(),
};
assert!(imported_module_matches_owner(&dotted, &owner));
assert!(imported_module_matches_owner(&plain, &owner));
assert!(!imported_module_matches_owner(&mismatched, &owner));
}
#[test]
fn same_stem_related_handles_missing_stems() {
let owner = PythonOwner {
name: "apply_discount".to_string(),
qualified_name: "apply_discount".to_string(),
file: PathBuf::from(""),
start_line: 1,
end_line: 1,
owner_kind: OwnerKind::Function,
decorators: Vec::new(),
imports: Vec::new(),
};
let test = PythonTest {
name: "test_x".to_string(),
file: PathBuf::from("tests/test_pricing.py"),
line: 1,
body_text: String::new(),
imports: Vec::new(),
decorators: Vec::new(),
parametrized: false,
framework: "pytest",
assertions: Vec::new(),
};
assert!(!same_stem_related(&test, &owner));
}
#[test]
fn classify_change_returns_none_when_line_outside_any_owner() {
let owners = extract_owners(
Path::new("src/pricing.py"),
"def apply_discount(amount):\n return amount - 10\n",
);
let tests = Vec::new();
let finding = classify_change(
Path::new("src/pricing.py"),
999,
" return amount - 10",
&owners,
&tests,
);
assert!(finding.is_none());
}
#[test]
fn classify_change_returns_none_for_different_file_owners() {
let owners = extract_owners(
Path::new("src/pricing.py"),
"def apply_discount(amount):\n return amount - 10\n",
);
let tests = Vec::new();
let finding = classify_change(
Path::new("src/elsewhere.py"),
2,
" return amount - 10",
&owners,
&tests,
);
assert!(finding.is_none());
}
#[test]
fn analyze_diff_emits_finding_for_changed_python_file_on_disk() -> Result<(), String> {
let root = unique_tempdir("analyze-diff-finding")?;
let production_rel = PathBuf::from("src/pricing.py");
let test_rel = PathBuf::from("tests/test_pricing.py");
write_file(
&root.join(&production_rel),
"def apply_discount(amount):\n if amount >= 100:\n return amount - 10\n return amount\n",
)?;
write_file(
&root.join(&test_rel),
"from src.pricing import apply_discount\n\ndef test_apply_discount():\n assert apply_discount(100) == 90\n",
)?;
let adapter = PythonAdapter;
let options = AnalysisOptions {
root: root.clone(),
base: None,
diff_file: None,
mode: crate::analysis::AnalysisMode::Draft,
include_unchanged_tests: false,
};
let policy = OraclePolicy::default();
let changed_files = vec![
ChangedFile {
path: production_rel.clone(),
added_lines: vec![crate::analysis::diff::ChangedLine {
line: 2,
text: " if amount >= 100:".to_string(),
}],
removed_lines: Vec::new(),
},
ChangedFile {
path: test_rel.clone(),
added_lines: vec![crate::analysis::diff::ChangedLine {
line: 1,
text: "from src.pricing import apply_discount".to_string(),
}],
removed_lines: Vec::new(),
},
ChangedFile {
path: PathBuf::from("README.md"),
added_lines: Vec::new(),
removed_lines: Vec::new(),
},
];
let result = adapter.analyze_diff(&options, &policy, &changed_files);
let cleanup = std::fs::remove_dir_all(&root);
let result = result?;
cleanup.map_err(|err| format!("remove_dir_all({}): {err}", root.display()))?;
if result.changed_files != 2 {
return Err(format!(
"expected two accepted changed files (production + test), got {}",
result.changed_files
));
}
if result.findings.len() != 1 {
return Err(format!(
"expected exactly one finding from the production diff line, got {}",
result.findings.len()
));
}
let finding = &result.findings[0];
if finding.class != ExposureClass::Exposed {
return Err(format!(
"expected an exposed finding when the related test has a strong oracle, got {:?}",
finding.class
));
}
if finding.language != Some(DomainLanguageId::Python) {
return Err("language metadata should be Python".to_string());
}
if finding.language_status != Some(LanguageStatus::Preview) {
return Err("language status should be Preview".to_string());
}
Ok(())
}
#[test]
fn collect_workspace_python_files_skips_excluded_directories() -> Result<(), String> {
let root = unique_tempdir("workspace-walk")?;
let included = [
PathBuf::from("src/keep.py"),
PathBuf::from("nested/also_keep.py"),
];
let excluded = [
PathBuf::from(".git/skip.py"),
PathBuf::from("target/skip.py"),
PathBuf::from("node_modules/skip.py"),
PathBuf::from(".ripr/skip.py"),
PathBuf::from(".direnv/skip.py"),
PathBuf::from("__pycache__/skip.py"),
PathBuf::from(".venv/skip.py"),
PathBuf::from("venv/skip.py"),
PathBuf::from("env/skip.py"),
PathBuf::from(".mypy_cache/skip.py"),
PathBuf::from("src/keep.rs"),
PathBuf::from("docs/README.md"),
];
for rel in included.iter().chain(excluded.iter()) {
write_file(&root.join(rel), "x = 1\n")?;
}
let files = collect_workspace_python_files(&root);
let cleanup = std::fs::remove_dir_all(&root);
for expected in included.iter() {
if !files.iter().any(|path| path == expected) {
let _ = cleanup;
return Err(format!(
"expected workspace walker to include {} (got {:?})",
expected.display(),
files
));
}
}
let still_present_excluded: Vec<_> = excluded
.iter()
.filter(|expected| files.iter().any(|path| path == *expected))
.collect();
cleanup.map_err(|err| format!("remove_dir_all({}): {err}", root.display()))?;
if !still_present_excluded.is_empty() {
return Err(format!(
"workspace walker should skip excluded paths but included {:?}",
still_present_excluded
));
}
Ok(())
}
#[test]
fn collect_workspace_python_files_returns_empty_for_missing_root() {
let missing = PathBuf::from(format!(
"/tmp/ripr-python-coverage-missing-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
));
assert!(collect_workspace_python_files(&missing).is_empty());
}
#[test]
fn related_test_matching_falls_back_to_same_stem_when_no_call() {
let owners = extract_owners(
Path::new("src/pricing.py"),
"def apply_discount(amount):\n return amount - 10\n",
);
let tests = extract_tests(
Path::new("tests/test_pricing.py"),
"def test_unrelated():\n do_something_else()\n",
);
let candidates = related_test_candidates(&owners[0], &tests);
assert_eq!(candidates.len(), 1);
assert_eq!(
candidates[0].relation,
PythonRelationKind::SameStem,
"same-stem proximity should kick in when no direct or import-alias call is seen"
);
}
#[test]
fn extract_owners_returns_empty_when_source_is_unparseable() {
let owners = extract_owners(Path::new("src/oops.py"), "def !!!");
assert!(owners.is_empty());
}
#[test]
fn extract_tests_returns_empty_when_source_is_unparseable() {
let tests = extract_tests(Path::new("tests/test_oops.py"), "def !!!");
assert!(tests.is_empty());
}
#[test]
fn related_test_candidates_break_ties_by_oracle_then_file_then_name() {
let owner = extract_owners(
Path::new("src/pricing.py"),
"def apply_discount(amount):\n return amount - 10\n",
)
.remove(0);
let mut tests = extract_tests(
Path::new("tests/test_pricing.py"),
"def test_alpha():\n assert 1 == 1\n\ndef test_beta():\n assert 1 == 1\n",
);
tests.extend(extract_tests(
Path::new("tests/pricing_test.py"),
"def test_alpha():\n assert 1 == 1\n",
));
let candidates = related_test_candidates(&owner, &tests);
assert!(
candidates.len() >= 3,
"expected at least three same-stem candidates, got {}",
candidates.len()
);
let first_pass: Vec<(String, String)> = candidates
.iter()
.map(|candidate| {
(
candidate.test.file.display().to_string(),
candidate.test.name.clone(),
)
})
.collect();
let second_pass = related_test_candidates(&owner, &tests);
let second_keys: Vec<(String, String)> = second_pass
.iter()
.map(|candidate| {
(
candidate.test.file.display().to_string(),
candidate.test.name.clone(),
)
})
.collect();
assert_eq!(first_pass, second_keys, "sort must be stable across runs");
}
#[test]
fn find_related_tests_marks_parametrized_test_when_no_assertion_extracted() -> Result<(), String> {
let owner = extract_owners(
Path::new("src/pricing.py"),
"def apply_discount(amount):\n return amount - 10\n",
)
.remove(0);
let tests = extract_tests(
Path::new("tests/test_pricing.py"),
r#"
import pytest
@pytest.mark.parametrize("amount", [1, 2])
def test_apply_discount(amount):
apply_discount(amount)
"#,
);
let related = find_related_tests(&owner, &tests);
if related.len() != 1 {
return Err(format!(
"expected one related test for parametrized matcher, got {}",
related.len()
));
}
if related[0].oracle.as_deref() != Some("pytest.mark.parametrize") {
return Err(format!(
"expected parametrize-marker oracle text, got {:?}",
related[0].oracle
));
}
if related[0].oracle_kind != OracleKind::Unknown {
return Err(format!(
"parametrize fallback should keep Unknown oracle kind, got {:?}",
related[0].oracle_kind
));
}
Ok(())
}
#[test]
fn test_has_mocked_module_recognizes_dotted_patch_decorator() {
let mocked = PythonTest {
name: "test_x".to_string(),
file: PathBuf::from("tests/test_x.py"),
line: 1,
body_text: String::new(),
imports: Vec::new(),
decorators: vec!["mock.patch".to_string()],
parametrized: false,
framework: "pytest",
assertions: Vec::new(),
};
assert!(test_has_mocked_module(&mocked));
let bare = PythonTest {
name: "test_y".to_string(),
file: PathBuf::from("tests/test_y.py"),
line: 1,
body_text: String::new(),
imports: Vec::new(),
decorators: vec!["patch".to_string()],
parametrized: false,
framework: "pytest",
assertions: Vec::new(),
};
assert!(test_has_mocked_module(&bare));
let clean = PythonTest {
name: "test_z".to_string(),
file: PathBuf::from("tests/test_z.py"),
line: 1,
body_text: String::new(),
imports: Vec::new(),
decorators: vec!["pytest.mark.skip".to_string()],
parametrized: false,
framework: "pytest",
assertions: Vec::new(),
};
assert!(!test_has_mocked_module(&clean));
}
#[test]
fn contains_dynamic_dispatch_detects_registry_indexed_call() {
assert!(contains_dynamic_dispatch(" return registry[key]()"));
assert!(!contains_dynamic_dispatch(" return registry[key]"));
assert!(!contains_dynamic_dispatch(" return notify()"));
}
#[test]
fn looks_like_call_expression_rejects_text_without_parens() {
assert!(!looks_like_call_expression(""));
assert!(!looks_like_call_expression("name"));
assert!(!looks_like_call_expression("name ("));
}
#[test]
fn classify_change_emits_decorator_evidence_when_owner_has_decorator() -> Result<(), String> {
let owners = extract_owners(
Path::new("src/service.py"),
"@retry(times=3)\ndef total():\n return 1\n",
);
let tests = extract_tests(
Path::new("tests/test_service.py"),
"def test_total():\n assert total() == 1\n",
);
let finding = classify_change(
Path::new("src/service.py"),
2,
" return 1",
&owners,
&tests,
)
.ok_or_else(|| "expected a finding".to_string())?;
let evidence_joined = finding.evidence.join("\n");
if !evidence_joined.contains("owner_decorators: ") {
return Err(format!(
"expected owner_decorators evidence line, got: {evidence_joined}"
));
}
if !evidence_joined.contains("retry") {
return Err(format!(
"expected `retry` decorator to be listed, got: {evidence_joined}"
));
}
Ok(())
}
#[test]
fn oracle_for_call_returns_none_for_unknown_callable() -> Result<(), String> {
let tests = extract_tests(
Path::new("tests/test_unknown.py"),
"def test_unknown_call():\n something_random(payload)\n",
);
if tests.len() != 1 {
return Err(format!("expected single test, got {}", tests.len()));
}
if !tests[0].assertions.is_empty() {
return Err(format!(
"non-oracle calls must not register as assertions, got {:?}",
tests[0].assertions
));
}
Ok(())
}
#[test]
fn assertion_from_expr_returns_none_for_non_call_expressions() -> Result<(), String> {
let tests = extract_tests(
Path::new("tests/test_bare.py"),
"def test_bare_expression():\n value\n other\n",
);
if tests.len() != 1 {
return Err(format!("expected single test, got {}", tests.len()));
}
if !tests[0].assertions.is_empty() {
return Err(format!(
"expression statements without calls must not assert, got {:?}",
tests[0].assertions
));
}
Ok(())
}
#[test]
fn visit_workspace_returns_silently_when_directory_is_unreadable() {
let mut out = Vec::new();
visit_workspace(
Path::new("/definitely-not-a-real-dir-ripr"),
Path::new("/definitely-not-a-real-dir-ripr"),
&mut out,
);
assert!(out.is_empty());
}
#[test]
fn async_test_inside_unittest_class_is_marked_as_unittest_framework() -> Result<(), String> {
let tests = extract_tests(
Path::new("tests/test_async_unittest.py"),
r#"
import unittest
class Async(unittest.TestCase):
async def test_async_path(self):
self.assertEqual(await compute(), 1)
"#,
);
let async_test = tests
.iter()
.find(|test| test.name == "test_async_path")
.ok_or_else(|| {
format!(
"expected `test_async_path`, got names {:?}",
tests.iter().map(|t| t.name.as_str()).collect::<Vec<_>>()
)
})?;
if async_test.framework != "unittest" {
return Err(format!(
"async test inside unittest.TestCase should be unittest, got {}",
async_test.framework
));
}
Ok(())
}
#[test]
fn expr_full_name_returns_none_for_unsupported_decorator_shapes() {
let tests = extract_tests(
Path::new("tests/test_unsupported_decorator.py"),
r#"
import pytest
@pytest.mark.parametrize[int]("amount", [1])
def test_apply_discount(amount):
apply_discount(amount)
"#,
);
if let Some(test) = tests.first() {
assert!(
!test
.decorators
.iter()
.any(|decorator| decorator.contains("parametrize")),
"subscript decorator shape should not yield a parametrize name; got {:?}",
test.decorators
);
}
}
#[test]
fn line_uses_imported_symbol_matches_attribute_access_on_imported_alias() {
let symbol = PythonImport {
imported: "logger".to_string(),
alias: "log".to_string(),
};
let imports = vec![symbol];
assert!(line_uses_imported_symbol(
" log.warn(\"problem\")",
&imports
));
assert!(!line_uses_imported_symbol(" unrelated", &imports));
}
#[test]
fn classify_probe_shape_assign_with_non_call_rhs_falls_through_to_predicate() {
let (family, delta) = classify_probe_shape(" total = amount + 10");
assert_eq!(family, ProbeFamily::Predicate);
assert_eq!(delta, DeltaKind::Control);
}
#[test]
fn analyze_diff_counts_python_file_but_skips_unreadable_workspace_source() -> Result<(), String> {
let root = unique_tempdir("analyze-diff-unreadable")?;
write_file(&root.join("src/keep.py"), "def keep():\n return 1\n")?;
let unreadable = root.join("src/looks_like_source.py");
std::fs::create_dir_all(&unreadable)
.map_err(|err| format!("create_dir_all({}): {err}", unreadable.display()))?;
let adapter = PythonAdapter;
let options = AnalysisOptions {
root: root.clone(),
base: None,
diff_file: None,
mode: crate::analysis::AnalysisMode::Draft,
include_unchanged_tests: false,
};
let policy = OraclePolicy::default();
let changed_files = vec![ChangedFile {
path: PathBuf::from("src/keep.py"),
added_lines: vec![crate::analysis::diff::ChangedLine {
line: 2,
text: " return 1".to_string(),
}],
removed_lines: Vec::new(),
}];
let result = adapter.analyze_diff(&options, &policy, &changed_files);
let cleanup = std::fs::remove_dir_all(&root);
let result = result?;
cleanup.map_err(|err| format!("remove_dir_all({}): {err}", root.display()))?;
if result.changed_files != 1 {
return Err(format!(
"expected 1 accepted changed file, got {}",
result.changed_files
));
}
if result.findings.len() != 1 {
return Err(format!(
"expected one NoStaticPath finding, got {} findings",
result.findings.len()
));
}
if result.findings[0].class != ExposureClass::NoStaticPath {
return Err(format!(
"expected NoStaticPath, got {:?}",
result.findings[0].class
));
}
Ok(())
}