use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use rustpython_ast::Visitor;
use rustpython_parser::ast::{
self, Arg, Arguments, Constant, Expr, ExprCall, StmtAssign, StmtAsyncFunctionDef,
StmtAugAssign, StmtDelete, StmtFunctionDef, WithItem,
};
use rustpython_parser::text_size::{TextRange, TextSize};
use rustpython_parser::Parse;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Violation {
pub file: PathBuf,
pub line: usize,
pub rule: &'static str,
pub message: String,
}
pub fn find_violations(root: impl AsRef<Path>) -> Result<Vec<Violation>> {
let root = root.as_ref();
let mut files = Vec::new();
collect_python_test_files(root, &mut files)?;
files.sort();
let mut violations = Vec::new();
for file in &files {
let source = std::fs::read_to_string(file)
.with_context(|| format!("reading test file `{}`", file.display()))?;
let suite = ast::Suite::parse(&source, &file.to_string_lossy())
.map_err(|err| anyhow!("parsing `{}`: {err}", file.display()))?;
let mut visitor = LintVisitor {
file,
source: &source,
fixture_depth: 0,
violations: Vec::new(),
};
for stmt in suite {
visitor.visit_stmt(stmt);
}
violations.append(&mut visitor.violations);
}
violations.sort_by(|a, b| a.file.cmp(&b.file).then(a.line.cmp(&b.line)));
Ok(violations)
}
struct LintVisitor<'a> {
file: &'a Path,
source: &'a str,
fixture_depth: usize,
violations: Vec<Violation>,
}
impl LintVisitor<'_> {
fn report(&mut self, range: TextRange, rule: &'static str, message: &str) {
self.violations.push(Violation {
file: self.file.to_path_buf(),
line: line_of(self.source, range.start()),
rule,
message: message.to_string(),
});
}
fn enter_function(&mut self, args: &Arguments, decorators: &[Expr], range: TextRange) -> bool {
let takes_monkeypatch = args
.posonlyargs
.iter()
.chain(&args.args)
.chain(&args.kwonlyargs)
.any(|arg| arg.def.arg.as_str() == "monkeypatch")
|| arg_named(&args.vararg, "monkeypatch")
|| arg_named(&args.kwarg, "monkeypatch");
if takes_monkeypatch {
self.report(
range,
"no-monkeypatch",
"test takes pytest's `monkeypatch` fixture; patch with `unittest.mock` wrapped in a `pytest.fixture` instead",
);
}
decorators.iter().any(is_fixture_decorator)
}
}
impl Visitor for LintVisitor<'_> {
fn visit_stmt_function_def(&mut self, node: StmtFunctionDef) {
let is_fixture = self.enter_function(&node.args, &node.decorator_list, node.range);
if is_fixture {
self.fixture_depth += 1;
}
self.generic_visit_stmt_function_def(node);
if is_fixture {
self.fixture_depth -= 1;
}
}
fn visit_stmt_async_function_def(&mut self, node: StmtAsyncFunctionDef) {
let is_fixture = self.enter_function(&node.args, &node.decorator_list, node.range);
if is_fixture {
self.fixture_depth += 1;
}
self.generic_visit_stmt_async_function_def(node);
if is_fixture {
self.fixture_depth -= 1;
}
}
fn visit_expr_call(&mut self, node: ExprCall) {
let is_patch = is_patch_call(&node);
if is_patch && self.fixture_depth == 0 {
self.report(
node.range,
"no-inline-patch",
"patch is called inline in a test body; move it into a `pytest.fixture`",
);
}
if is_patch && patches_constant(&node) {
self.report(node.range, "no-constant-patch", CONSTANT_PATCH_MSG);
}
if is_environ_mutation_call(&node) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_expr_call(node);
}
fn visit_withitem(&mut self, node: WithItem) {
self.visit_expr(node.context_expr);
if let Some(optional_vars) = node.optional_vars {
self.visit_expr(*optional_vars);
}
}
fn visit_stmt_assign(&mut self, node: StmtAssign) {
if node.targets.iter().any(is_os_environ_subscript) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_assign(node);
}
fn visit_stmt_aug_assign(&mut self, node: StmtAugAssign) {
if is_os_environ_subscript(node.target.as_ref()) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_aug_assign(node);
}
fn visit_stmt_delete(&mut self, node: StmtDelete) {
if node.targets.iter().any(is_os_environ_subscript) {
self.report(node.range, "no-environ-mutation", ENVIRON_MUTATION_MSG);
}
self.generic_visit_stmt_delete(node);
}
}
fn arg_named(arg: &Option<Box<Arg>>, name: &str) -> bool {
arg.as_ref().is_some_and(|arg| arg.arg.as_str() == name)
}
fn is_fixture_decorator(decorator: &Expr) -> bool {
let target = match decorator {
Expr::Call(call) => call.func.as_ref(),
other => other,
};
match target {
Expr::Name(name) => name.id.as_str() == "fixture",
Expr::Attribute(attr) => attr.attr.as_str() == "fixture",
_ => false,
}
}
fn is_patch_call(call: &ExprCall) -> bool {
match call.func.as_ref() {
Expr::Name(name) => name.id.as_str() == "patch",
Expr::Attribute(attr) => {
let name = attr.attr.as_str();
name == "patch"
|| ((name == "object" || name == "dict") && attr_base_is_patch(attr.value.as_ref()))
}
_ => false,
}
}
fn attr_base_is_patch(expr: &Expr) -> bool {
match expr {
Expr::Name(name) => name.id.as_str() == "patch",
Expr::Attribute(attr) => attr.attr.as_str() == "patch",
_ => false,
}
}
const CONSTANT_PATCH_MSG: &str = "patches a module-global config constant; inject config explicitly (a consumer that did `from pkg import CONSTANT` snapshots the value at import time and ignores the patch)";
fn patches_constant(call: &ExprCall) -> bool {
let Some(Expr::Constant(constant)) = call.args.first() else {
return false;
};
let Constant::Str(target) = &constant.value else {
return false;
};
target.rsplit('.').next().is_some_and(is_upper_constant)
}
fn is_upper_constant(name: &str) -> bool {
!name.is_empty()
&& name
.chars()
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
&& name.chars().any(|c| c.is_ascii_uppercase())
}
const ENVIRON_MUTATION_MSG: &str =
"os.environ is mutated directly; set env via `patch.dict(os.environ, {...})` instead";
fn is_os_environ(expr: &Expr) -> bool {
matches!(
expr,
Expr::Attribute(attr)
if attr.attr.as_str() == "environ"
&& matches!(attr.value.as_ref(), Expr::Name(name) if name.id.as_str() == "os")
)
}
fn is_os_environ_subscript(expr: &Expr) -> bool {
matches!(expr, Expr::Subscript(sub) if is_os_environ(sub.value.as_ref()))
}
fn is_environ_mutation_call(call: &ExprCall) -> bool {
matches!(
call.func.as_ref(),
Expr::Attribute(attr)
if is_os_environ(attr.value.as_ref()) && is_environ_mutator(attr.attr.as_str())
)
}
fn is_environ_mutator(method: &str) -> bool {
matches!(
method,
"update" | "pop" | "setdefault" | "clear" | "popitem"
)
}
fn line_of(source: &str, offset: TextSize) -> usize {
let offset = (u32::from(offset) as usize).min(source.len());
source.as_bytes()[..offset]
.iter()
.filter(|&&byte| byte == b'\n')
.count()
+ 1
}
fn collect_python_test_files(dir: &Path, out: &mut Vec<PathBuf>) -> Result<()> {
let entries =
std::fs::read_dir(dir).with_context(|| format!("reading directory `{}`", dir.display()))?;
for entry in entries {
let path = entry
.with_context(|| format!("reading an entry under `{}`", dir.display()))?
.path();
if path.is_dir() {
collect_python_test_files(&path, out)?;
} else if is_python_test_file(&path) {
out.push(path);
}
}
Ok(())
}
fn is_python_test_file(path: &Path) -> bool {
let name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or_default();
name == "conftest.py"
|| name.ends_with("_test.py")
|| (name.starts_with("test_") && name.ends_with(".py"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn recognizes_python_test_files() {
assert!(is_python_test_file(Path::new("widget_test.py")));
assert!(is_python_test_file(Path::new("pkg/widget_test.py")));
assert!(is_python_test_file(Path::new("test_widget.py")));
assert!(is_python_test_file(Path::new("conftest.py")));
}
#[test]
fn ignores_non_test_files() {
assert!(!is_python_test_file(Path::new("widget.py")));
assert!(!is_python_test_file(Path::new("conftest.pyi")));
assert!(!is_python_test_file(Path::new("README.md")));
assert!(!is_python_test_file(Path::new("testing.py")));
}
#[test]
fn line_of_counts_newlines() {
let src = "a\nb\nc\n";
assert_eq!(line_of(src, TextSize::from(0)), 1);
assert_eq!(line_of(src, TextSize::from(2)), 2);
assert_eq!(line_of(src, TextSize::from(4)), 3);
}
#[test]
fn recognizes_environ_mutators() {
assert!(is_environ_mutator("update"));
assert!(is_environ_mutator("pop"));
assert!(is_environ_mutator("clear"));
assert!(!is_environ_mutator("get"));
assert!(!is_environ_mutator("keys"));
}
#[test]
fn recognizes_upper_constants() {
assert!(is_upper_constant("CACHE_DIR"));
assert!(is_upper_constant("DEBUG"));
assert!(is_upper_constant("MAX_2"));
assert!(!is_upper_constant("cache_dir"));
assert!(!is_upper_constant("CacheDir"));
assert!(!is_upper_constant("fetch"));
assert!(!is_upper_constant(""));
assert!(!is_upper_constant("_"));
assert!(!is_upper_constant("123"));
}
}