use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use rustpython_ast::Visitor;
use rustpython_parser::ast::{
self, Arg, Arguments, Constant, Expr, ExprCall, StmtAssign, StmtAsyncFunctionDef,
StmtAugAssign, StmtDelete, StmtFunctionDef, StmtIf, StmtImport, StmtImportFrom, WithItem,
};
use rustpython_parser::text_size::{TextRange, TextSize};
use rustpython_parser::Parse;
pub use crate::violation::Violation;
pub fn find_violations(root: impl AsRef<Path>) -> Result<Vec<Violation>> {
let root = root.as_ref();
let first_party = first_party_package(root);
let mut files = Vec::new();
collect_python_files(root, &mut files, is_python_test_file)?;
files.sort();
let mut violations = Vec::new();
for file in &files {
let source = std::fs::read_to_string(file)
.with_context(|| format!("reading test file `{}`", file.display()))?;
let suite = ast::Suite::parse(&source, &file.to_string_lossy())
.map_err(|err| anyhow!("parsing `{}`: {err}", file.display()))?;
let mut visitor = LintVisitor {
file,
source: &source,
fixture_depth: 0,
first_party: first_party.as_deref(),
violations: Vec::new(),
};
for stmt in suite {
visitor.visit_stmt(stmt);
}
violations.append(&mut visitor.violations);
}
violations.sort_by(|a, b| a.file.cmp(&b.file).then(a.line.cmp(&b.line)));
Ok(violations)
}
pub fn find_unit_isolation_violations(root: impl AsRef<Path>) -> Result<Vec<Violation>> {
let root = root.as_ref();
let Some(first_party) = first_party_package(root) else {
return Ok(Vec::new());
};
let mut files = Vec::new();
collect_python_files(root, &mut files, is_python_unit_test_file)?;
files.sort();
let mut violations = Vec::new();
for file in &files {
let source = std::fs::read_to_string(file)
.with_context(|| format!("reading test file `{}`", file.display()))?;
let suite = ast::Suite::parse(&source, &file.to_string_lossy())
.map_err(|err| anyhow!("parsing `{}`: {err}", file.display()))?;
let base = unit_under_test_base(file);
let mut visitor = UnitIsolationVisitor {
source: &source,
first_party: &first_party,
base: &base,
type_checking_depth: 0,
imports: Vec::new(),
patch_targets: Vec::new(),
};
for stmt in suite {
visitor.visit_stmt(stmt);
}
for import in &visitor.imports {
if import.is_uut || import.is_mocked(&visitor.patch_targets) {
continue;
}
violations.push(Violation {
file: file.to_path_buf(),
line: import.line,
rule: "unmocked-collaborator",
message: format!(
"unit test imports `{}` without mocking it — a unit test isolates the \
unit under test, so mock every collaborator (patch it by string in a \
fixture)",
import.display
),
});
}
}
violations.sort_by(|a, b| a.file.cmp(&b.file).then(a.line.cmp(&b.line)));
Ok(violations)
}
struct ImportRecord {
display: String,
line: usize,
is_uut: bool,
symbols: Vec<String>,
module: Option<String>,
}
impl ImportRecord {
fn is_mocked(&self, patch_targets: &[String]) -> bool {
let symbol_mocked = patch_targets.iter().any(|target| {
let last = target.rsplit('.').next().unwrap_or(target);
self.symbols.iter().any(|symbol| symbol == last)
});
if symbol_mocked {
return true;
}
match &self.module {
Some(module) => {
let prefix = format!("{module}.");
patch_targets
.iter()
.any(|target| target == module || target.starts_with(&prefix))
}
None => false,
}
}
}
struct UnitIsolationVisitor<'a> {
source: &'a str,
first_party: &'a str,
base: &'a str,
type_checking_depth: usize,
imports: Vec<ImportRecord>,
patch_targets: Vec<String>,
}
impl Visitor for UnitIsolationVisitor<'_> {
fn visit_stmt_import(&mut self, node: StmtImport) {
if self.type_checking_depth == 0 {
let line = line_of(self.source, node.range.start());
for alias in &node.names {
let module = alias.name.as_str();
if is_checked_import(import_head(module), self.first_party) {
self.imports.push(ImportRecord {
display: module.to_string(),
line,
is_uut: last_segment(module) == self.base,
symbols: Vec::new(),
module: Some(module.to_string()),
});
}
}
}
self.generic_visit_stmt_import(node);
}
fn visit_stmt_import_from(&mut self, node: StmtImportFrom) {
if self.type_checking_depth == 0 {
let level = relative_level(&node);
let module = node.module.as_ref().map(|m| m.as_str());
let should_check = level > 0
|| module.is_some_and(|m| is_checked_import(import_head(m), self.first_party));
if should_check {
let line = line_of(self.source, node.range.start());
let dots = ".".repeat(level);
match module {
Some(module) => self.imports.push(ImportRecord {
display: format!("{dots}{module}"),
line,
is_uut: last_segment(module) == self.base,
symbols: node.names.iter().map(|a| a.name.to_string()).collect(),
module: None,
}),
None => {
for alias in &node.names {
let name = alias.name.as_str();
self.imports.push(ImportRecord {
display: format!("{dots}{name}"),
line,
is_uut: name == self.base,
symbols: vec![name.to_string()],
module: None,
});
}
}
}
}
}
self.generic_visit_stmt_import_from(node);
}
fn visit_expr_call(&mut self, node: ExprCall) {
if is_patch_call(&node) {
if let Some(target) = patch_string_target(&node) {
self.patch_targets.push(target.to_string());
}
}
self.generic_visit_expr_call(node);
}
fn visit_stmt_if(&mut self, node: StmtIf) {
if is_type_checking(node.test.as_ref()) {
self.type_checking_depth += 1;
for stmt in node.body {
self.visit_stmt(stmt);
}
self.type_checking_depth -= 1;
for stmt in node.orelse {
self.visit_stmt(stmt);
}
} else {
self.generic_visit_stmt_if(node);
}
}
}
fn import_head(module: &str) -> &str {
module.split('.').next().unwrap_or(module)
}
fn is_checked_import(head: &str, first_party: &str) -> bool {
if head == first_party {
return true; }
if TEST_FRAMEWORK.contains(&head) {
return false; }
if EFFECTFUL_STDLIB.contains(&head) {
return true; }
if STDLIB_MODULES.contains(&head) {
return false; }
true }
const TEST_FRAMEWORK: &[&str] = &["pytest", "_pytest", "mock"];
const EFFECTFUL_STDLIB: &[&str] = &[
"asynchat",
"asyncore",
"ctypes",
"curses",
"dbm",
"fcntl",
"ftplib",
"imaplib",
"mmap",
"msvcrt",
"multiprocessing",
"nis",
"nntplib",
"ossaudiodev",
"poplib",
"pty",
"random",
"secrets",
"select",
"selectors",
"signal",
"smtpd",
"smtplib",
"socket",
"socketserver",
"spwd",
"sqlite3",
"ssl",
"subprocess",
"syslog",
"telnetlib",
"termios",
"tty",
"webbrowser",
"winreg",
"winsound",
];
const STDLIB_MODULES: &[&str] = &[
"abc",
"aifc",
"antigravity",
"argparse",
"array",
"ast",
"asynchat",
"asyncio",
"asyncore",
"atexit",
"audioop",
"base64",
"bdb",
"binascii",
"bisect",
"builtins",
"bz2",
"cProfile",
"calendar",
"cgi",
"cgitb",
"chunk",
"cmath",
"cmd",
"code",
"codecs",
"codeop",
"collections",
"colorsys",
"compileall",
"concurrent",
"configparser",
"contextlib",
"contextvars",
"copy",
"copyreg",
"crypt",
"csv",
"ctypes",
"curses",
"dataclasses",
"datetime",
"dbm",
"decimal",
"difflib",
"dis",
"distutils",
"doctest",
"email",
"encodings",
"ensurepip",
"enum",
"errno",
"faulthandler",
"fcntl",
"filecmp",
"fileinput",
"fnmatch",
"fractions",
"ftplib",
"functools",
"gc",
"genericpath",
"getopt",
"getpass",
"gettext",
"glob",
"graphlib",
"grp",
"gzip",
"hashlib",
"heapq",
"hmac",
"html",
"http",
"idlelib",
"imaplib",
"imghdr",
"imp",
"importlib",
"inspect",
"io",
"ipaddress",
"itertools",
"json",
"keyword",
"lib2to3",
"linecache",
"locale",
"logging",
"lzma",
"mailbox",
"mailcap",
"marshal",
"math",
"mimetypes",
"mmap",
"modulefinder",
"msilib",
"msvcrt",
"multiprocessing",
"netrc",
"nis",
"nntplib",
"nt",
"ntpath",
"nturl2path",
"numbers",
"opcode",
"operator",
"optparse",
"os",
"ossaudiodev",
"pathlib",
"pdb",
"pickle",
"pickletools",
"pipes",
"pkgutil",
"platform",
"plistlib",
"poplib",
"posix",
"posixpath",
"pprint",
"profile",
"pstats",
"pty",
"pwd",
"py_compile",
"pyclbr",
"pydoc",
"pydoc_data",
"pyexpat",
"queue",
"quopri",
"random",
"re",
"readline",
"reprlib",
"resource",
"rlcompleter",
"runpy",
"sched",
"secrets",
"select",
"selectors",
"shelve",
"shlex",
"shutil",
"signal",
"site",
"smtpd",
"smtplib",
"sndhdr",
"socket",
"socketserver",
"spwd",
"sqlite3",
"sre_compile",
"sre_constants",
"sre_parse",
"ssl",
"stat",
"statistics",
"string",
"stringprep",
"struct",
"subprocess",
"sunau",
"symtable",
"sys",
"sysconfig",
"syslog",
"tabnanny",
"tarfile",
"telnetlib",
"tempfile",
"termios",
"textwrap",
"this",
"threading",
"time",
"timeit",
"tkinter",
"token",
"tokenize",
"tomllib",
"trace",
"traceback",
"tracemalloc",
"tty",
"turtle",
"turtledemo",
"types",
"typing",
"unicodedata",
"unittest",
"urllib",
"uu",
"uuid",
"venv",
"warnings",
"wave",
"weakref",
"webbrowser",
"winreg",
"winsound",
"wsgiref",
"xdrlib",
"xml",
"xmlrpc",
"zipapp",
"zipfile",
"zipimport",
"zlib",
"zoneinfo",
];
fn last_segment(module: &str) -> &str {
module.rsplit('.').next().unwrap_or(module)
}
fn relative_level(node: &StmtImportFrom) -> usize {
node.level.map_or(0, |level| level.to_usize())
}
fn is_type_checking(test: &Expr) -> bool {
match test {
Expr::Name(name) => name.id.as_str() == "TYPE_CHECKING",
Expr::Attribute(attr) => attr.attr.as_str() == "TYPE_CHECKING",
_ => false,
}
}
fn unit_under_test_base(file: &Path) -> String {
let name = file
.file_name()
.and_then(|n| n.to_str())
.unwrap_or_default();
let stem = name.strip_suffix(".py").unwrap_or(name);
stem.strip_suffix("_test").unwrap_or(stem).to_string()
}
struct LintVisitor<'a> {
file: &'a Path,
source: &'a str,
fixture_depth: usize,
first_party: Option<&'a str>,
violations: Vec<Violation>,
}
impl LintVisitor<'_> {
fn report(&mut self, range: TextRange, rule: &'static str, message: &str) {
self.violations.push(Violation {
file: self.file.to_path_buf(),
line: line_of(self.source, range.start()),
rule,
message: message.to_string(),
});
}
fn enter_function(&mut self, args: &Arguments, decorators: &[Expr], range: TextRange) -> bool {
let takes_monkeypatch = args
.posonlyargs
.iter()
.chain(&args.args)
.chain(&args.kwonlyargs)
.any(|arg| arg.def.arg.as_str() == "monkeypatch")
|| arg_named(&args.vararg, "monkeypatch")
|| arg_named(&args.kwarg, "monkeypatch");
if takes_monkeypatch {
self.report(
range,
"no-monkeypatch",
"test takes pytest's `monkeypatch` fixture; patch with `unittest.mock` wrapped in a `pytest.fixture` instead",
);
}
decorators.iter().any(is_fixture_decorator)
}
}
impl Visitor for LintVisitor<'_> {
fn visit_stmt_function_def(&mut self, node: StmtFunctionDef) {
let is_fixture = self.enter_function(&node.args, &node.decorator_list, node.range);
if is_fixture {
self.fixture_depth += 1;
}
self.generic_visit_stmt_function_def(node);
if is_fixture {
self.fixture_depth -= 1;
}
}
fn visit_stmt_async_function_def(&mut self, node: StmtAsyncFunctionDef) {
let is_fixture = self.enter_function(&node.args, &node.decorator_list, node.range);
if is_fixture {
self.fixture_depth += 1;
}
self.generic_visit_stmt_async_function_def(node);
if is_fixture {
self.fixture_depth -= 1;
}
}
fn visit_expr_call(&mut self, node: ExprCall) {
let is_patch = is_patch_call(&node);
if is_patch && self.fixture_depth == 0 {
self.report(
node.range,
"no-inline-patch",
"patch is called inline in a test body; move it into a `pytest.fixture`",
);
}
if is_patch && patches_constant(&node) {
self.report(node.range, "no-constant-patch", CONSTANT_PATCH_MSG);
}
if is_patch {
if let Some(pkg) = self.first_party {
if patch_string_target(&node).is_some_and(|target| patches_first_party(target, pkg))
{
self.report(node.range, "no-first-party-patch", FIRST_PARTY_PATCH_MSG);
}
}
}
if is_environ_mutation_call(&node) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_expr_call(node);
}
fn visit_withitem(&mut self, node: WithItem) {
self.visit_expr(node.context_expr);
if let Some(optional_vars) = node.optional_vars {
self.visit_expr(*optional_vars);
}
}
fn visit_stmt_assign(&mut self, node: StmtAssign) {
if node.targets.iter().any(is_os_environ_subscript) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_assign(node);
}
fn visit_stmt_aug_assign(&mut self, node: StmtAugAssign) {
if is_os_environ_subscript(node.target.as_ref()) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_aug_assign(node);
}
fn visit_stmt_delete(&mut self, node: StmtDelete) {
if node.targets.iter().any(is_os_environ_subscript) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_delete(node);
}
}
fn arg_named(arg: &Option<Box<Arg>>, name: &str) -> bool {
arg.as_ref().is_some_and(|arg| arg.arg.as_str() == name)
}
fn is_fixture_decorator(decorator: &Expr) -> bool {
let target = match decorator {
Expr::Call(call) => call.func.as_ref(),
other => other,
};
match target {
Expr::Name(name) => name.id.as_str() == "fixture",
Expr::Attribute(attr) => attr.attr.as_str() == "fixture",
_ => false,
}
}
fn is_patch_call(call: &ExprCall) -> bool {
match call.func.as_ref() {
Expr::Name(name) => name.id.as_str() == "patch",
Expr::Attribute(attr) => {
let name = attr.attr.as_str();
name == "patch"
|| ((name == "object" || name == "dict") && attr_base_is_patch(attr.value.as_ref()))
}
_ => false,
}
}
fn attr_base_is_patch(expr: &Expr) -> bool {
match expr {
Expr::Name(name) => name.id.as_str() == "patch",
Expr::Attribute(attr) => attr.attr.as_str() == "patch",
_ => false,
}
}
const CONSTANT_PATCH_MSG: &str = "patches a module-global config constant; inject config explicitly (a consumer that did `from pkg import CONSTANT` snapshots the value at import time and ignores the patch)";
const FIRST_PARTY_PATCH_MSG: &str = "patches a first-party target; an integration test must run first-party code for real — only third-party packages and effectful stdlib may be patched";
fn patch_string_target(call: &ExprCall) -> Option<&str> {
if let Some(Expr::Constant(constant)) = call.args.first() {
if let Constant::Str(target) = &constant.value {
return Some(target.as_str());
}
}
None
}
fn patches_constant(call: &ExprCall) -> bool {
patch_string_target(call)
.and_then(|target| target.rsplit('.').next())
.is_some_and(is_upper_constant)
}
fn patches_first_party(target: &str, pkg: &str) -> bool {
target
.split('.')
.next()
.is_some_and(|head| !head.is_empty() && head == pkg)
}
fn is_upper_constant(name: &str) -> bool {
!name.is_empty()
&& name
.chars()
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
&& name.chars().any(|c| c.is_ascii_uppercase())
}
const ENVIRON_MUTATION_MSG: &str =
"os.environ is mutated directly; set env via `patch.dict(os.environ, {...})` instead";
fn is_os_environ(expr: &Expr) -> bool {
matches!(
expr,
Expr::Attribute(attr)
if attr.attr.as_str() == "environ"
&& matches!(attr.value.as_ref(), Expr::Name(name) if name.id.as_str() == "os")
)
}
fn is_os_environ_subscript(expr: &Expr) -> bool {
matches!(expr, Expr::Subscript(sub) if is_os_environ(sub.value.as_ref()))
}
fn is_environ_mutation_call(call: &ExprCall) -> bool {
matches!(
call.func.as_ref(),
Expr::Attribute(attr)
if is_os_environ(attr.value.as_ref()) && is_environ_mutator(attr.attr.as_str())
)
}
fn is_environ_mutator(method: &str) -> bool {
matches!(
method,
"update" | "pop" | "setdefault" | "clear" | "popitem"
)
}
fn line_of(source: &str, offset: TextSize) -> usize {
let offset = (u32::from(offset) as usize).min(source.len());
source.as_bytes()[..offset]
.iter()
.filter(|&&byte| byte == b'\n')
.count()
+ 1
}
fn first_party_package(root: &Path) -> Option<String> {
for dir in root.ancestors() {
let candidate = dir.join("pyproject.toml");
if candidate.is_file() {
return read_project_name(&candidate).map(|name| normalize_dist_name(&name));
}
if dir.join(".git").exists() {
break;
}
}
None
}
fn read_project_name(path: &Path) -> Option<String> {
let contents = std::fs::read_to_string(path).ok()?;
let value: toml::Value = toml::from_str(&contents).ok()?;
value
.get("project")?
.get("name")?
.as_str()
.map(str::to_owned)
}
fn normalize_dist_name(name: &str) -> String {
name.trim().to_ascii_lowercase().replace(['-', '.'], "_")
}
fn collect_python_files(
dir: &Path,
out: &mut Vec<PathBuf>,
is_match: fn(&Path) -> bool,
) -> Result<()> {
let entries =
std::fs::read_dir(dir).with_context(|| format!("reading directory `{}`", dir.display()))?;
for entry in entries {
let path = entry
.with_context(|| format!("reading an entry under `{}`", dir.display()))?
.path();
if path.is_dir() {
collect_python_files(&path, out, is_match)?;
} else if is_match(&path) {
out.push(path);
}
}
Ok(())
}
fn is_python_test_file(path: &Path) -> bool {
let name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or_default();
name == "conftest.py" || name.ends_with("_test.py")
}
fn is_python_unit_test_file(path: &Path) -> bool {
let name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or_default();
name.ends_with("_test.py")
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
struct TempDir(PathBuf);
impl TempDir {
fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(0);
let dir = std::env::temp_dir().join(format!(
"tc-lint-{}-{}",
std::process::id(),
COUNTER.fetch_add(1, Ordering::Relaxed),
));
std::fs::create_dir_all(&dir).unwrap();
TempDir(dir)
}
fn write(&self, name: &str, contents: &str) {
let path = self.0.join(name);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(path, contents).unwrap();
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
#[test]
fn normalize_dist_name_maps_to_import_name() {
assert_eq!(normalize_dist_name("My-Project"), "my_project");
assert_eq!(normalize_dist_name("ns.pkg"), "ns_pkg");
assert_eq!(normalize_dist_name(" myproject "), "myproject");
assert_eq!(normalize_dist_name("myproject"), "myproject");
}
fn parse_call(src: &str) -> ExprCall {
let suite = ast::Suite::parse(src, "t.py").expect("snippet should parse");
match suite.into_iter().next().expect("one statement") {
ast::Stmt::Expr(stmt) => match *stmt.value {
Expr::Call(call) => call,
other => panic!("expected a call, got {other:?}"),
},
other => panic!("expected an expression statement, got {other:?}"),
}
}
#[test]
fn patch_string_target_only_reads_string_literals() {
let str_call = parse_call("patch(\"pkg.mod.attr\")\n");
assert_eq!(patch_string_target(&str_call), Some("pkg.mod.attr"));
let int_call = parse_call("patch(42)\n");
assert_eq!(patch_string_target(&int_call), None);
let name_call = parse_call("patch(target)\n");
assert_eq!(patch_string_target(&name_call), None);
let empty_call = parse_call("patch()\n");
assert_eq!(patch_string_target(&empty_call), None);
}
#[test]
fn patches_first_party_matches_head_segment() {
assert!(patches_first_party("myproject.ledger.record", "myproject"));
assert!(patches_first_party("myproject", "myproject"));
assert!(!patches_first_party("requests.get", "myproject"));
assert!(!patches_first_party("myproject_extra.x", "myproject"));
assert!(!patches_first_party("", "myproject"));
assert!(!patches_first_party(".leading", "myproject"));
}
#[test]
fn first_party_package_reads_pyproject_name() {
let tree = TempDir::new();
tree.write(
"pyproject.toml",
"[project]\nname = \"My-Project\"\nversion = \"0.0.0\"\n",
);
assert_eq!(first_party_package(&tree.0).as_deref(), Some("my_project"));
}
#[test]
fn first_party_package_is_none_without_a_project_name() {
let tree = TempDir::new();
tree.write("pyproject.toml", "[build-system]\nrequires = []\n");
tree.write(".git", "");
assert_eq!(first_party_package(&tree.0), None);
}
#[test]
fn first_party_package_is_none_when_absent() {
let tree = TempDir::new();
assert_eq!(first_party_package(&tree.0), None);
}
fn unmocked(base: &str, first_party: &str, source: &str) -> Vec<String> {
let suite = ast::Suite::parse(source, "t.py").expect("snippet should parse");
let mut visitor = UnitIsolationVisitor {
source,
first_party,
base,
type_checking_depth: 0,
imports: Vec::new(),
patch_targets: Vec::new(),
};
for stmt in suite {
visitor.visit_stmt(stmt);
}
visitor
.imports
.iter()
.filter(|i| !i.is_uut && !i.is_mocked(&visitor.patch_targets))
.map(|i| i.display.clone())
.collect()
}
#[test]
fn import_head_and_last_segment() {
assert_eq!(import_head("myproject.db.conn"), "myproject");
assert_eq!(import_head("requests"), "requests");
assert_eq!(last_segment("myproject.db.conn"), "conn");
assert_eq!(last_segment("widget"), "widget");
}
#[test]
fn unit_under_test_base_strips_test_suffix() {
assert_eq!(
unit_under_test_base(Path::new("pkg/widget_test.py")),
"widget"
);
assert_eq!(
unit_under_test_base(Path::new("test_widget.py")),
"test_widget"
);
assert_eq!(unit_under_test_base(Path::new("plain.py")), "plain");
}
#[test]
fn recognizes_python_unit_test_files() {
assert!(is_python_unit_test_file(Path::new("widget_test.py")));
assert!(is_python_unit_test_file(Path::new("pkg/widget_test.py")));
assert!(!is_python_unit_test_file(Path::new("test_widget.py")));
assert!(!is_python_unit_test_file(Path::new("conftest.py")));
assert!(!is_python_unit_test_file(Path::new("widget.py")));
}
#[test]
fn is_mocked_matches_symbol_last_segment_and_module_prefix() {
let symbol = ImportRecord {
display: "myproject.ledger".into(),
line: 1,
is_uut: false,
symbols: vec!["record".into()],
module: None,
};
assert!(symbol.is_mocked(&["myproject.widget.record".into()]));
assert!(symbol.is_mocked(&["myproject.ledger.record".into()]));
assert!(!symbol.is_mocked(&["myproject.widget.other".into()]));
let module = ImportRecord {
display: "myproject.db".into(),
line: 1,
is_uut: false,
symbols: Vec::new(),
module: Some("myproject.db".into()),
};
assert!(module.is_mocked(&["myproject.db.conn".into()])); assert!(module.is_mocked(&["myproject.db".into()])); assert!(!module.is_mocked(&["myproject.dbx.y".into()])); }
#[test]
fn visitor_flags_first_party_and_external_collaborators() {
let found = unmocked(
"widget",
"myproject",
"from myproject.widget import build\n\
from myproject.ledger import record\n\
import requests\n",
);
assert_eq!(
found,
vec!["myproject.ledger".to_string(), "requests".to_string()]
);
}
#[test]
fn visitor_clears_a_mocked_collaborator() {
let found = unmocked(
"widget",
"myproject",
"from myproject.ledger import record\npatch(\"myproject.widget.record\")\n",
);
assert!(found.is_empty(), "got: {found:?}");
}
#[test]
fn visitor_handles_module_and_relative_imports() {
assert_eq!(
unmocked("widget", "myproject", "import myproject.db\n"),
vec!["myproject.db".to_string()]
);
assert!(unmocked(
"widget",
"myproject",
"import myproject.db\npatch(\"myproject.db.connect\")\n"
)
.is_empty());
assert_eq!(
unmocked("widget", "myproject", "from .ledger import record\n"),
vec![".ledger".to_string()]
);
assert_eq!(
unmocked(
"widget",
"myproject",
"from . import ledger\nfrom . import widget\n"
),
vec![".ledger".to_string()]
);
}
#[test]
fn visitor_skips_type_checking_imports() {
let found = unmocked(
"widget",
"myproject",
"if TYPE_CHECKING:\n from myproject.models import Widget\nelse:\n from myproject.ledger import record\n",
);
assert_eq!(found, vec!["myproject.ledger".to_string()]);
}
#[test]
fn is_checked_import_classifies_origins() {
assert!(is_checked_import("myproject", "myproject")); assert!(!is_checked_import("pytest", "myproject")); assert!(!is_checked_import("_pytest", "myproject"));
assert!(is_checked_import("subprocess", "myproject")); assert!(is_checked_import("socket", "myproject"));
assert!(!is_checked_import("json", "myproject")); assert!(!is_checked_import("dataclasses", "myproject"));
assert!(is_checked_import("requests", "myproject")); assert!(is_checked_import("stripe", "myproject"));
assert!(!is_checked_import("os", "myproject"));
assert!(!is_checked_import("pathlib", "myproject"));
assert!(!is_checked_import("datetime", "myproject"));
}
#[test]
fn visitor_flags_external_collaborators() {
let found = unmocked(
"widget",
"myproject",
"import requests\nimport subprocess\nimport json\nimport pytest\n",
);
assert_eq!(found.len(), 2, "got: {found:?}");
assert!(found.contains(&"requests".to_string()));
assert!(found.contains(&"subprocess".to_string()));
}
#[test]
fn visitor_type_checking_variants_and_plain_if() {
assert!(unmocked(
"widget",
"myproject",
"if typing.TYPE_CHECKING:\n from myproject.models import W\n import myproject.db\n"
)
.is_empty());
assert_eq!(
unmocked(
"widget",
"myproject",
"if ready == 1:\n from myproject.ledger import record\n"
),
vec!["myproject.ledger".to_string()]
);
}
#[test]
fn find_unit_isolation_without_pyproject_reports_nothing() {
let tree = TempDir::new();
tree.write("widget_test.py", "from myproject.ledger import record\n");
tree.write(".git", "");
assert!(find_unit_isolation_violations(&tree.0)
.expect("a readable tree should succeed")
.is_empty());
}
#[test]
fn find_unit_isolation_walks_subdirs_and_flags() {
let tree = TempDir::new();
tree.write("pyproject.toml", "[project]\nname = \"myproject\"\n");
tree.write("pkg/thing_test.py", "from myproject.ledger import record\n");
let found =
find_unit_isolation_violations(&tree.0).expect("a readable tree should succeed");
assert_eq!(found.len(), 1, "got: {found:?}");
assert_eq!(found[0].rule, "unmocked-collaborator");
assert!(found[0].message.contains("myproject.ledger"));
}
#[test]
fn recognizes_python_test_files() {
assert!(is_python_test_file(Path::new("widget_test.py")));
assert!(is_python_test_file(Path::new("pkg/widget_test.py")));
assert!(is_python_test_file(Path::new("conftest.py")));
assert!(!is_python_test_file(Path::new("test_widget.py")));
}
#[test]
fn ignores_non_test_files() {
assert!(!is_python_test_file(Path::new("widget.py")));
assert!(!is_python_test_file(Path::new("conftest.pyi")));
assert!(!is_python_test_file(Path::new("README.md")));
assert!(!is_python_test_file(Path::new("testing.py")));
}
#[test]
fn line_of_counts_newlines() {
let src = "a\nb\nc\n";
assert_eq!(line_of(src, TextSize::from(0)), 1);
assert_eq!(line_of(src, TextSize::from(2)), 2);
assert_eq!(line_of(src, TextSize::from(4)), 3);
}
#[test]
fn recognizes_environ_mutators() {
assert!(is_environ_mutator("update"));
assert!(is_environ_mutator("pop"));
assert!(is_environ_mutator("clear"));
assert!(!is_environ_mutator("get"));
assert!(!is_environ_mutator("keys"));
}
#[test]
fn recognizes_upper_constants() {
assert!(is_upper_constant("CACHE_DIR"));
assert!(is_upper_constant("DEBUG"));
assert!(is_upper_constant("MAX_2"));
assert!(!is_upper_constant("cache_dir"));
assert!(!is_upper_constant("CacheDir"));
assert!(!is_upper_constant("fetch"));
assert!(!is_upper_constant(""));
assert!(!is_upper_constant("_"));
assert!(!is_upper_constant("123"));
}
}