use super::types::TypeImportSpec;
use super::FixtureDatabase;
use once_cell::sync::Lazy;
use rustpython_parser::ast::{Expr, Stmt};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use tracing::{debug, info, warn};
static RUNTIME_STDLIB_MODULES: OnceLock<HashSet<String>> = OnceLock::new();
static STDLIB_MODULES: Lazy<HashSet<&'static str>> = Lazy::new(|| {
[
"os",
"sys",
"re",
"json",
"typing",
"collections",
"functools",
"itertools",
"pathlib",
"datetime",
"time",
"math",
"random",
"copy",
"io",
"abc",
"contextlib",
"dataclasses",
"enum",
"logging",
"unittest",
"asyncio",
"concurrent",
"multiprocessing",
"threading",
"subprocess",
"shutil",
"tempfile",
"glob",
"fnmatch",
"pickle",
"sqlite3",
"urllib",
"http",
"email",
"html",
"xml",
"socket",
"ssl",
"select",
"signal",
"struct",
"codecs",
"textwrap",
"string",
"difflib",
"inspect",
"dis",
"traceback",
"warnings",
"weakref",
"types",
"importlib",
"pkgutil",
"pprint",
"reprlib",
"numbers",
"decimal",
"fractions",
"statistics",
"hashlib",
"hmac",
"secrets",
"base64",
"binascii",
"zlib",
"gzip",
"bz2",
"lzma",
"zipfile",
"tarfile",
"csv",
"configparser",
"argparse",
"getopt",
"getpass",
"platform",
"errno",
"ctypes",
"__future__",
]
.into_iter()
.collect()
});
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct FixtureImport {
pub module_path: String,
pub is_star_import: bool,
pub imported_names: Vec<String>,
pub importing_file: PathBuf,
pub line: usize,
}
impl FixtureDatabase {
pub(crate) fn extract_fixture_imports(
&self,
stmts: &[Stmt],
file_path: &Path,
line_index: &[usize],
) -> Vec<FixtureImport> {
let mut imports = Vec::new();
for stmt in stmts {
if let Stmt::ImportFrom(import_from) = stmt {
let mut module = import_from
.module
.as_ref()
.map(|m| m.to_string())
.unwrap_or_default();
if let Some(ref level) = import_from.level {
let dots = ".".repeat(level.to_usize());
module = dots + &module;
}
if self.is_standard_library_module(&module) {
continue;
}
let line =
self.get_line_from_offset(import_from.range.start().to_usize(), line_index);
let is_star = import_from
.names
.iter()
.any(|alias| alias.name.as_str() == "*");
if is_star {
imports.push(FixtureImport {
module_path: module,
is_star_import: true,
imported_names: Vec::new(),
importing_file: file_path.to_path_buf(),
line,
});
} else {
let names: Vec<String> = import_from
.names
.iter()
.map(|alias| alias.asname.as_ref().unwrap_or(&alias.name).to_string())
.collect();
if !names.is_empty() {
imports.push(FixtureImport {
module_path: module,
is_star_import: false,
imported_names: names,
importing_file: file_path.to_path_buf(),
line,
});
}
}
}
}
imports
}
pub(crate) fn extract_pytest_plugins(&self, stmts: &[Stmt]) -> Vec<String> {
let mut modules = Vec::new();
for stmt in stmts {
let value = match stmt {
Stmt::Assign(assign) => {
let is_pytest_plugins = assign.targets.iter().any(|target| {
matches!(target, Expr::Name(name) if name.id.as_str() == "pytest_plugins")
});
if !is_pytest_plugins {
continue;
}
assign.value.as_ref()
}
Stmt::AnnAssign(ann_assign) => {
let is_pytest_plugins = matches!(
ann_assign.target.as_ref(),
Expr::Name(name) if name.id.as_str() == "pytest_plugins"
);
if !is_pytest_plugins {
continue;
}
match ann_assign.value.as_ref() {
Some(v) => v.as_ref(),
None => continue,
}
}
_ => continue,
};
modules.clear();
match value {
Expr::Constant(c) => {
if let rustpython_parser::ast::Constant::Str(s) = &c.value {
modules.push(s.to_string());
}
}
Expr::List(list) => {
for elt in &list.elts {
if let Expr::Constant(c) = elt {
if let rustpython_parser::ast::Constant::Str(s) = &c.value {
modules.push(s.to_string());
}
}
}
}
Expr::Tuple(tuple) => {
for elt in &tuple.elts {
if let Expr::Constant(c) = elt {
if let rustpython_parser::ast::Constant::Str(s) = &c.value {
modules.push(s.to_string());
}
}
}
}
_ => {
debug!("Ignoring dynamic pytest_plugins value (not a string/list/tuple)");
}
}
}
modules
}
fn is_standard_library_module(&self, module: &str) -> bool {
is_stdlib_module(module)
}
pub(crate) fn resolve_module_to_file(
&self,
module_path: &str,
importing_file: &Path,
) -> Option<PathBuf> {
debug!(
"Resolving module '{}' from file {:?}",
module_path, importing_file
);
let parent_dir = importing_file.parent()?;
if module_path.starts_with('.') {
self.resolve_relative_import(module_path, parent_dir)
} else {
self.resolve_absolute_import(module_path, parent_dir)
}
}
fn resolve_relative_import(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
let mut current_dir = base_dir.to_path_buf();
let mut chars = module_path.chars().peekable();
while chars.peek() == Some(&'.') {
chars.next();
if chars.peek() != Some(&'.') {
break;
}
current_dir = current_dir.parent()?.to_path_buf();
}
let remaining: String = chars.collect();
if remaining.is_empty() {
let init_path = current_dir.join("__init__.py");
if init_path.exists() {
return Some(init_path);
}
return None;
}
self.find_module_file(&remaining, ¤t_dir)
}
fn resolve_absolute_import(&self, module_path: &str, start_dir: &Path) -> Option<PathBuf> {
let mut current_dir = start_dir.to_path_buf();
loop {
if let Some(path) = self.find_module_file(module_path, ¤t_dir) {
return Some(path);
}
match current_dir.parent() {
Some(parent) => current_dir = parent.to_path_buf(),
None => break,
}
}
for sp in self.site_packages_paths.lock().unwrap().iter() {
if let Some(path) = self.find_module_file(module_path, sp) {
return Some(path);
}
}
for install in self.editable_install_roots.lock().unwrap().iter() {
if let Some(path) = self.find_module_file(module_path, &install.source_root) {
return Some(path);
}
}
None
}
fn find_module_file(&self, module_path: &str, base_dir: &Path) -> Option<PathBuf> {
let parts: Vec<&str> = module_path.split('.').collect();
let mut current_path = base_dir.to_path_buf();
for (i, part) in parts.iter().enumerate() {
let is_last = i == parts.len() - 1;
if is_last {
let py_file = current_path.join(format!("{}.py", part));
if py_file.exists() {
return Some(py_file);
}
let canonical_py_file = self.get_canonical_path(py_file.clone());
if self.file_cache.contains_key(&canonical_py_file) {
return Some(py_file);
}
let package_init = current_path.join(part).join("__init__.py");
if package_init.exists() {
return Some(package_init);
}
let canonical_package_init = self.get_canonical_path(package_init.clone());
if self.file_cache.contains_key(&canonical_package_init) {
return Some(package_init);
}
} else {
current_path = current_path.join(part);
if !current_path.is_dir() {
return None;
}
}
}
None
}
pub fn get_imported_fixtures(
&self,
file_path: &Path,
visited: &mut HashSet<PathBuf>,
) -> HashSet<String> {
let canonical_path = self.get_canonical_path(file_path.to_path_buf());
if visited.contains(&canonical_path) {
debug!("Circular import detected for {:?}, skipping", file_path);
return HashSet::new();
}
visited.insert(canonical_path.clone());
let Some(content) = self.get_file_content(&canonical_path) else {
return HashSet::new();
};
let content_hash = Self::hash_content(&content);
let current_version = self
.definitions_version
.load(std::sync::atomic::Ordering::SeqCst);
if let Some(cached) = self.imported_fixtures_cache.get(&canonical_path) {
let (cached_content_hash, cached_version, cached_fixtures) = cached.value();
if *cached_content_hash == content_hash && *cached_version == current_version {
debug!("Cache hit for imported fixtures in {:?}", canonical_path);
return cached_fixtures.as_ref().clone();
}
}
let imported_fixtures = self.compute_imported_fixtures(&canonical_path, &content, visited);
self.imported_fixtures_cache.insert(
canonical_path.clone(),
(
content_hash,
current_version,
Arc::new(imported_fixtures.clone()),
),
);
info!(
"Found {} imported fixtures for {:?}: {:?}",
imported_fixtures.len(),
file_path,
imported_fixtures
);
imported_fixtures
}
fn compute_imported_fixtures(
&self,
canonical_path: &Path,
content: &str,
visited: &mut HashSet<PathBuf>,
) -> HashSet<String> {
let mut imported_fixtures = HashSet::new();
let Some(parsed) = self.get_parsed_ast(canonical_path, content) else {
return imported_fixtures;
};
let line_index = self.get_line_index(canonical_path, content);
if let rustpython_parser::ast::Mod::Module(module) = parsed.as_ref() {
let imports = self.extract_fixture_imports(&module.body, canonical_path, &line_index);
for import in imports {
let Some(resolved_path) =
self.resolve_module_to_file(&import.module_path, canonical_path)
else {
debug!(
"Could not resolve module '{}' from {:?}",
import.module_path, canonical_path
);
continue;
};
let resolved_canonical = self.get_canonical_path(resolved_path);
debug!(
"Resolved import '{}' to {:?}",
import.module_path, resolved_canonical
);
if import.is_star_import {
if let Some(file_fixtures) = self.file_definitions.get(&resolved_canonical) {
for fixture_name in file_fixtures.iter() {
imported_fixtures.insert(fixture_name.clone());
}
}
let transitive = self.get_imported_fixtures(&resolved_canonical, visited);
imported_fixtures.extend(transitive);
} else {
for name in &import.imported_names {
if self.definitions.contains_key(name) {
imported_fixtures.insert(name.clone());
}
}
}
}
let plugin_modules = self.extract_pytest_plugins(&module.body);
for module_path in plugin_modules {
let Some(resolved_path) = self.resolve_module_to_file(&module_path, canonical_path)
else {
debug!(
"Could not resolve pytest_plugins module '{}' from {:?}",
module_path, canonical_path
);
continue;
};
let resolved_canonical = self.get_canonical_path(resolved_path);
debug!(
"Resolved pytest_plugins '{}' to {:?}",
module_path, resolved_canonical
);
if let Some(file_fixtures) = self.file_definitions.get(&resolved_canonical) {
for fixture_name in file_fixtures.iter() {
imported_fixtures.insert(fixture_name.clone());
}
}
let transitive = self.get_imported_fixtures(&resolved_canonical, visited);
imported_fixtures.extend(transitive);
}
}
imported_fixtures
}
pub fn is_fixture_imported_in_file(&self, fixture_name: &str, file_path: &Path) -> bool {
let mut visited = HashSet::new();
let imported = self.get_imported_fixtures(file_path, &mut visited);
imported.contains(fixture_name)
}
}
pub(crate) fn is_stdlib_module(module: &str) -> bool {
let first_part = module.split('.').next().unwrap_or(module);
if let Some(runtime) = RUNTIME_STDLIB_MODULES.get() {
runtime.contains(first_part)
} else {
STDLIB_MODULES.contains(first_part)
}
}
fn find_venv_python(venv_path: &Path) -> Option<PathBuf> {
for name in &["python3", "python"] {
let candidate = venv_path.join("bin").join(name);
if candidate.is_file() {
return Some(candidate);
}
}
for name in &["python3.exe", "python.exe"] {
let candidate = venv_path.join("Scripts").join(name);
if candidate.is_file() {
return Some(candidate);
}
}
None
}
pub(crate) fn try_init_stdlib_from_python(venv_path: &Path) -> bool {
if RUNTIME_STDLIB_MODULES.get().is_some() {
return true;
}
let Some(python) = find_venv_python(venv_path) else {
debug!(
"try_init_stdlib_from_python: no Python binary found in {:?}",
venv_path
);
return false;
};
debug!(
"try_init_stdlib_from_python: querying stdlib module names via {:?}",
python
);
let output = match std::process::Command::new(&python)
.args([
"-I",
"-c",
"import sys; print('\\n'.join(sorted(sys.stdlib_module_names)))",
])
.output()
{
Ok(o) => o,
Err(e) => {
warn!(
"try_init_stdlib_from_python: failed to run {:?}: {}",
python, e
);
return false;
}
};
if !output.status.success() {
debug!(
"try_init_stdlib_from_python: Python exited with {:?} \
(Python < 3.10 or other error) — using built-in stdlib list",
output.status.code()
);
return false;
}
let stdout = match std::str::from_utf8(&output.stdout) {
Ok(s) => s,
Err(e) => {
warn!(
"try_init_stdlib_from_python: Python output is not valid UTF-8: {}",
e
);
return false;
}
};
let modules: HashSet<String> = stdout
.lines()
.map(str::trim)
.filter(|l| !l.is_empty())
.map(str::to_owned)
.collect();
if modules.is_empty() {
warn!("try_init_stdlib_from_python: Python returned an empty module list");
return false;
}
info!(
"try_init_stdlib_from_python: loaded {} stdlib module names from {:?}",
modules.len(),
python
);
let _ = RUNTIME_STDLIB_MODULES.set(modules);
true
}
impl FixtureDatabase {
pub(crate) fn file_path_to_module_path(file_path: &Path) -> Option<String> {
let stem = file_path.file_stem()?.to_str()?;
let mut components = if stem == "__init__" {
vec![]
} else {
vec![stem.to_string()]
};
let mut current = file_path.parent()?;
loop {
if current.join("__init__.py").exists() {
let name = current.file_name().and_then(|n| n.to_str())?;
components.push(name.to_string());
match current.parent() {
Some(parent) => current = parent,
None => break,
}
} else {
break;
}
}
if components.is_empty() {
return None;
}
components.reverse();
Some(components.join("."))
}
fn resolve_relative_module_to_string(
&self,
module: &str,
level: usize,
fixture_file: &Path,
) -> Option<String> {
let mut base = fixture_file.parent()?;
for _ in 1..level {
base = base.parent()?;
}
let target = if module.is_empty() {
base.join("__init__.py")
} else {
let rel_path = module.replace('.', "/");
base.join(format!("{}.py", rel_path))
};
Self::file_path_to_module_path(&target)
}
pub(crate) fn build_name_to_import_map(
&self,
stmts: &[Stmt],
fixture_file: &Path,
) -> HashMap<String, TypeImportSpec> {
let mut map = HashMap::new();
for stmt in stmts {
match stmt {
Stmt::Import(import_stmt) => {
for alias in &import_stmt.names {
let module = alias.name.to_string();
let (check_name, import_statement) = if let Some(ref asname) = alias.asname
{
let asname_str = asname.to_string();
(
asname_str.clone(),
format!("import {} as {}", module, asname_str),
)
} else {
let top_level = module.split('.').next().unwrap_or(&module).to_string();
(top_level, format!("import {}", module))
};
map.insert(
check_name.clone(),
TypeImportSpec {
check_name,
import_statement,
},
);
}
}
Stmt::ImportFrom(import_from) => {
let level = import_from
.level
.as_ref()
.map(|l| l.to_usize())
.unwrap_or(0);
let raw_module = import_from
.module
.as_ref()
.map(|m| m.to_string())
.unwrap_or_default();
let abs_module = if level > 0 {
match self.resolve_relative_module_to_string(
&raw_module,
level,
fixture_file,
) {
Some(m) => m,
None => {
debug!(
"Could not resolve relative import '.{}' from {:?}, skipping",
raw_module, fixture_file
);
continue;
}
}
} else {
raw_module
};
for alias in &import_from.names {
if alias.name.as_str() == "*" {
continue; }
let name = alias.name.to_string();
let (check_name, import_statement) = if let Some(ref asname) = alias.asname
{
let asname_str = asname.to_string();
(
asname_str.clone(),
format!("from {} import {} as {}", abs_module, name, asname_str),
)
} else {
(name.clone(), format!("from {} import {}", abs_module, name))
};
map.insert(
check_name.clone(),
TypeImportSpec {
check_name,
import_statement,
},
);
}
}
_ => {}
}
}
map
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
struct TempDir(std::path::PathBuf);
impl TempDir {
fn new(name: &str) -> Self {
let path = std::env::temp_dir().join(name);
fs::create_dir_all(&path).unwrap();
Self(path)
}
fn path(&self) -> &std::path::Path {
&self.0
}
}
impl Drop for TempDir {
fn drop(&mut self) {
let _ = fs::remove_dir_all(&self.0);
}
}
fn touch(path: &std::path::Path) {
fs::create_dir_all(path.parent().unwrap()).unwrap();
fs::write(path, b"").unwrap();
}
#[test]
fn test_find_venv_python_unix_python3() {
let dir = TempDir::new("fvp_unix_py3");
touch(&dir.path().join("bin/python3"));
let result = find_venv_python(dir.path());
assert_eq!(result, Some(dir.path().join("bin/python3")));
}
#[test]
fn test_find_venv_python_unix_python_fallback() {
let dir = TempDir::new("fvp_unix_py");
touch(&dir.path().join("bin/python"));
let result = find_venv_python(dir.path());
assert_eq!(result, Some(dir.path().join("bin/python")));
}
#[test]
fn test_find_venv_python_unix_prefers_python3_over_python() {
let dir = TempDir::new("fvp_unix_prefer");
touch(&dir.path().join("bin/python3"));
touch(&dir.path().join("bin/python"));
let result = find_venv_python(dir.path());
assert_eq!(
result,
Some(dir.path().join("bin/python3")),
"python3 should be preferred over python"
);
}
#[test]
fn test_find_venv_python_windows_style() {
let dir = TempDir::new("fvp_win_py");
touch(&dir.path().join("Scripts/python.exe"));
let result = find_venv_python(dir.path());
assert_eq!(result, Some(dir.path().join("Scripts/python.exe")));
}
#[test]
fn test_find_venv_python_windows_prefers_python3_exe() {
let dir = TempDir::new("fvp_win_prefer");
touch(&dir.path().join("Scripts/python3.exe"));
touch(&dir.path().join("Scripts/python.exe"));
let result = find_venv_python(dir.path());
assert_eq!(
result,
Some(dir.path().join("Scripts/python3.exe")),
"python3.exe should be preferred over python.exe"
);
}
#[test]
fn test_find_venv_python_not_found() {
let dir = TempDir::new("fvp_empty");
assert_eq!(find_venv_python(dir.path()), None);
}
#[test]
fn test_find_venv_python_wrong_layout() {
let dir = TempDir::new("fvp_wrong_layout");
touch(&dir.path().join("python3"));
assert_eq!(find_venv_python(dir.path()), None);
}
#[test]
fn test_try_init_stdlib_no_python_returns_false_or_already_set() {
let dir = TempDir::new("fvp_no_python");
let _ = try_init_stdlib_from_python(dir.path());
assert!(is_stdlib_module("os"), "os must always be stdlib");
assert!(is_stdlib_module("sys"), "sys must always be stdlib");
assert!(!is_stdlib_module("pytest"), "pytest is not stdlib");
assert!(!is_stdlib_module("flask"), "flask is not stdlib");
}
#[test]
fn test_module_path_regular_file_no_package() {
let dir = TempDir::new("fptmp_plain");
let file = dir.path().join("conftest.py");
fs::write(&file, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&file),
Some("conftest".to_string())
);
}
#[test]
fn test_module_path_regular_file_in_package() {
let dir = TempDir::new("fptmp_pkg");
let pkg = dir.path().join("pkg");
fs::create_dir_all(&pkg).unwrap();
fs::write(pkg.join("__init__.py"), "").unwrap();
let file = pkg.join("module.py");
fs::write(&file, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&file),
Some("pkg.module".to_string())
);
}
#[test]
fn test_module_path_init_file_is_package_root() {
let dir = TempDir::new("fptmp_init");
let pkg = dir.path().join("pkg");
fs::create_dir_all(&pkg).unwrap();
let init = pkg.join("__init__.py");
fs::write(&init, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&init),
Some("pkg".to_string())
);
}
#[test]
fn test_module_path_nested_init_file() {
let dir = TempDir::new("fptmp_nested_init");
let pkg = dir.path().join("pkg");
let sub = pkg.join("sub");
fs::create_dir_all(&sub).unwrap();
fs::write(pkg.join("__init__.py"), "").unwrap();
let init = sub.join("__init__.py");
fs::write(&init, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&init),
Some("pkg.sub".to_string())
);
}
#[test]
fn test_module_path_nested_package() {
let dir = TempDir::new("fptmp_nested");
let pkg = dir.path().join("pkg");
let sub = pkg.join("sub");
fs::create_dir_all(&sub).unwrap();
fs::write(pkg.join("__init__.py"), "").unwrap();
fs::write(sub.join("__init__.py"), "").unwrap();
let file = sub.join("module.py");
fs::write(&file, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&file),
Some("pkg.sub.module".to_string())
);
}
#[test]
fn test_module_path_conftest_in_package() {
let dir = TempDir::new("fptmp_conftest_pkg");
let pkg = dir.path().join("mypkg");
fs::create_dir_all(&pkg).unwrap();
fs::write(pkg.join("__init__.py"), "").unwrap();
let file = pkg.join("conftest.py");
fs::write(&file, "").unwrap();
assert_eq!(
FixtureDatabase::file_path_to_module_path(&file),
Some("mypkg.conftest".to_string())
);
}
#[test]
fn test_build_map_dotted_import_keyed_by_top_level() {
let db = FixtureDatabase::new();
let map = db.get_name_to_import_map(
&PathBuf::from("/tmp/test_bm_dotted.py"),
"import collections.abc\n",
);
let spec = map
.get("collections")
.expect("key 'collections' must be present");
assert_eq!(spec.check_name, "collections");
assert_eq!(spec.import_statement, "import collections.abc");
assert!(
!map.contains_key("collections.abc"),
"full dotted path must not be a key; only the top-level bound name is"
);
}
#[test]
fn test_build_map_two_level_dotted_import_keyed_by_top_level() {
let db = FixtureDatabase::new();
let map = db.get_name_to_import_map(
&PathBuf::from("/tmp/test_bm_two_level.py"),
"import xml.etree.ElementTree\n",
);
let spec = map.get("xml").expect("key 'xml' must be present");
assert_eq!(spec.check_name, "xml");
assert_eq!(spec.import_statement, "import xml.etree.ElementTree");
assert!(
!map.contains_key("xml.etree.ElementTree"),
"full dotted path must not be a key"
);
assert!(
!map.contains_key("xml.etree"),
"partial dotted path must not be a key"
);
}
#[test]
fn test_build_map_simple_import_unaffected() {
let db = FixtureDatabase::new();
let map =
db.get_name_to_import_map(&PathBuf::from("/tmp/test_bm_simple.py"), "import pathlib\n");
let spec = map.get("pathlib").expect("key 'pathlib' must be present");
assert_eq!(spec.check_name, "pathlib");
assert_eq!(spec.import_statement, "import pathlib");
}
#[test]
fn test_build_map_aliased_dotted_import_unaffected() {
let db = FixtureDatabase::new();
let map = db.get_name_to_import_map(
&PathBuf::from("/tmp/test_bm_aliased.py"),
"import collections.abc as abc_mod\n",
);
let spec = map.get("abc_mod").expect("key 'abc_mod' must be present");
assert_eq!(spec.check_name, "abc_mod");
assert_eq!(spec.import_statement, "import collections.abc as abc_mod");
assert!(
!map.contains_key("collections"),
"top-level name must not be keyed when alias present"
);
assert!(
!map.contains_key("collections.abc"),
"dotted path must not be keyed when alias present"
);
}
}