use super::imports::try_init_stdlib_from_python;
use super::types::{FixtureDefinition, FixtureScope, TypeImportSpec};
use super::FixtureDatabase;
use glob::Pattern;
use rayon::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::{debug, error, info, warn};
use walkdir::WalkDir;
#[derive(Debug, Clone)]
pub(crate) struct Pytest11EntryPoint {
pub(crate) name: String,
pub(crate) module_path: String,
}
impl FixtureDatabase {
const SKIP_DIRECTORIES: &'static [&'static str] = &[
".git",
".hg",
".svn",
".venv",
"venv",
"env",
".env",
"__pycache__",
".pytest_cache",
".mypy_cache",
".ruff_cache",
".tox",
".nox",
"build",
"dist",
".eggs",
"node_modules",
"bower_components",
"target",
".idea",
".vscode",
".cache",
".local",
"vendor",
"site-packages",
];
pub(crate) fn should_skip_directory(dir_name: &str) -> bool {
if Self::SKIP_DIRECTORIES.contains(&dir_name) {
return true;
}
if dir_name.ends_with(".egg-info") {
return true;
}
false
}
pub fn scan_workspace(&self, root_path: &Path) {
self.scan_workspace_with_excludes(root_path, &[]);
}
pub fn scan_workspace_with_excludes(&self, root_path: &Path, exclude_patterns: &[Pattern]) {
let root_path_buf = root_path
.canonicalize()
.unwrap_or_else(|_| root_path.to_path_buf());
let root_path = root_path_buf.as_path();
info!("Scanning workspace: {:?}", root_path);
*self.workspace_root.lock().unwrap() = Some(root_path.to_path_buf());
if !root_path.exists() {
warn!(
"Workspace path does not exist, skipping scan: {:?}",
root_path
);
return;
}
let mut files_to_process: Vec<std::path::PathBuf> = Vec::new();
let mut skipped_dirs = 0;
let walker = WalkDir::new(root_path).into_iter().filter_entry(|entry| {
if entry.file_type().is_file() {
return true;
}
if let Some(dir_name) = entry.file_name().to_str() {
!Self::should_skip_directory(dir_name)
} else {
true
}
});
for entry in walker {
let entry = match entry {
Ok(e) => e,
Err(err) => {
if err
.io_error()
.is_some_and(|e| e.kind() == std::io::ErrorKind::PermissionDenied)
{
warn!(
"Permission denied accessing path during workspace scan: {}",
err
);
} else {
debug!("Error during workspace scan: {}", err);
}
continue;
}
};
let path = entry.path();
if let Ok(relative) = path.strip_prefix(root_path) {
if relative.components().any(|c| {
c.as_os_str()
.to_str()
.is_some_and(Self::should_skip_directory)
}) {
skipped_dirs += 1;
continue;
}
}
if !exclude_patterns.is_empty() {
if let Ok(relative_path) = path.strip_prefix(root_path) {
let relative_str = relative_path.to_string_lossy();
if exclude_patterns.iter().any(|p| p.matches(&relative_str)) {
debug!("Skipping excluded path: {:?}", path);
continue;
}
}
}
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename == "conftest.py"
|| filename.starts_with("test_") && filename.ends_with(".py")
|| filename.ends_with("_test.py")
{
files_to_process.push(path.to_path_buf());
}
}
}
if skipped_dirs > 0 {
debug!("Skipped {} entries in filtered directories", skipped_dirs);
}
let total_files = files_to_process.len();
info!("Found {} test/conftest files to process", total_files);
let error_count = AtomicUsize::new(0);
let permission_denied_count = AtomicUsize::new(0);
files_to_process.par_iter().for_each(|path| {
debug!("Found test/conftest file: {:?}", path);
match std::fs::read_to_string(path) {
Ok(content) => {
self.analyze_file_fresh(path.clone(), &content);
}
Err(err) => {
if err.kind() == std::io::ErrorKind::PermissionDenied {
debug!("Permission denied reading file: {:?}", path);
permission_denied_count.fetch_add(1, Ordering::Relaxed);
} else {
error!("Failed to read file {:?}: {}", path, err);
error_count.fetch_add(1, Ordering::Relaxed);
}
}
}
});
let errors = error_count.load(Ordering::Relaxed);
let permission_errors = permission_denied_count.load(Ordering::Relaxed);
if errors > 0 {
warn!("Workspace scan completed with {} read errors", errors);
}
if permission_errors > 0 {
warn!(
"Workspace scan: skipped {} files due to permission denied",
permission_errors
);
}
info!(
"Workspace scan complete. Processed {} files ({} permission denied, {} errors)",
total_files, permission_errors, errors
);
self.scan_venv_fixtures(root_path);
self.scan_imported_fixture_modules(root_path);
info!("Total fixtures defined: {}", self.definitions.len());
info!("Total files with fixture usages: {}", self.usages.len());
}
fn scan_imported_fixture_modules(&self, _root_path: &Path) {
use std::collections::HashSet;
info!("Scanning for imported fixture modules");
let mut processed_files: HashSet<std::path::PathBuf> = HashSet::new();
let site_packages_paths = self.site_packages_paths.lock().unwrap().clone();
let editable_roots: Vec<PathBuf> = self
.editable_install_roots
.lock()
.unwrap()
.iter()
.map(|e| e.source_root.clone())
.collect();
let mut files_to_check: Vec<std::path::PathBuf> = self
.file_cache
.iter()
.filter(|entry| {
let key = entry.key();
let is_conftest_or_test = key
.file_name()
.and_then(|n| n.to_str())
.map(|n| {
n == "conftest.py"
|| (n.starts_with("test_") && n.ends_with(".py"))
|| n.ends_with("_test.py")
})
.unwrap_or(false);
let is_venv_plugin = site_packages_paths.iter().any(|sp| key.starts_with(sp));
let is_editable_plugin = editable_roots.iter().any(|er| key.starts_with(er));
let is_entry_point_plugin = self.plugin_fixture_files.contains_key(key);
is_conftest_or_test || is_venv_plugin || is_editable_plugin || is_entry_point_plugin
})
.map(|entry| entry.key().clone())
.collect();
if files_to_check.is_empty() {
debug!("No conftest/test/plugin files found, skipping import scan");
return;
}
info!(
"Starting import scan with {} conftest/test/plugin files",
files_to_check.len()
);
let mut reanalyze_as_plugin: HashSet<std::path::PathBuf> = HashSet::new();
let mut iteration = 0;
while !files_to_check.is_empty() {
iteration += 1;
debug!(
"Import scan iteration {}: checking {} files",
iteration,
files_to_check.len()
);
let mut new_modules: HashSet<std::path::PathBuf> = HashSet::new();
for file_path in &files_to_check {
if processed_files.contains(file_path) {
continue;
}
processed_files.insert(file_path.clone());
let importer_is_plugin = self.plugin_fixture_files.contains_key(file_path);
let Some(content) = self.get_file_content(file_path) else {
continue;
};
let Some(parsed) = self.get_parsed_ast(file_path, &content) else {
continue;
};
let line_index = self.get_line_index(file_path, &content);
if let rustpython_parser::ast::Mod::Module(module) = parsed.as_ref() {
let imports =
self.extract_fixture_imports(&module.body, file_path, &line_index);
for import in imports {
if let Some(resolved_path) =
self.resolve_module_to_file(&import.module_path, file_path)
{
let canonical = self.get_canonical_path(resolved_path);
let should_mark_plugin = importer_is_plugin && import.is_star_import;
if should_mark_plugin
&& !self.plugin_fixture_files.contains_key(&canonical)
{
self.plugin_fixture_files.insert(canonical.clone(), ());
if self.file_cache.contains_key(&canonical) {
reanalyze_as_plugin.insert(canonical.clone());
}
}
if !processed_files.contains(&canonical)
&& !self.file_cache.contains_key(&canonical)
{
new_modules.insert(canonical);
}
}
}
let plugin_modules = self.extract_pytest_plugins(&module.body);
for module_path in plugin_modules {
if let Some(resolved_path) =
self.resolve_module_to_file(&module_path, file_path)
{
let canonical = self.get_canonical_path(resolved_path);
if importer_is_plugin
&& !self.plugin_fixture_files.contains_key(&canonical)
{
self.plugin_fixture_files.insert(canonical.clone(), ());
if self.file_cache.contains_key(&canonical) {
reanalyze_as_plugin.insert(canonical.clone());
}
}
if !processed_files.contains(&canonical)
&& !self.file_cache.contains_key(&canonical)
{
new_modules.insert(canonical);
}
}
}
}
}
if new_modules.is_empty() {
debug!("No new modules found in iteration {}", iteration);
break;
}
info!(
"Iteration {}: found {} new modules to analyze",
iteration,
new_modules.len()
);
for module_path in &new_modules {
if module_path.exists() {
debug!("Analyzing imported module: {:?}", module_path);
match std::fs::read_to_string(module_path) {
Ok(content) => {
self.analyze_file_fresh(module_path.clone(), &content);
}
Err(err) => {
debug!("Failed to read imported module {:?}: {}", module_path, err);
}
}
}
}
files_to_check = new_modules.into_iter().collect();
}
if !reanalyze_as_plugin.is_empty() {
info!(
"Re-analyzing {} cached module(s) newly marked as plugin files",
reanalyze_as_plugin.len()
);
for module_path in &reanalyze_as_plugin {
if let Some(content) = self.get_file_content(module_path) {
debug!("Re-analyzing as plugin: {:?}", module_path);
self.analyze_file(module_path.clone(), &content);
}
}
}
info!(
"Imported fixture module scan complete after {} iterations",
iteration
);
}
fn scan_venv_fixtures(&self, root_path: &Path) {
info!("Scanning for pytest plugins in virtual environment");
let venv_paths = vec![
root_path.join(".venv"),
root_path.join("venv"),
root_path.join("env"),
];
info!("Checking for venv in: {:?}", root_path);
for venv_path in &venv_paths {
debug!("Checking venv path: {:?}", venv_path);
if venv_path.exists() {
info!("Found virtual environment at: {:?}", venv_path);
self.scan_venv_site_packages(venv_path);
return;
} else {
debug!(" Does not exist: {:?}", venv_path);
}
}
if let Ok(venv) = std::env::var("VIRTUAL_ENV") {
info!("Found VIRTUAL_ENV environment variable: {}", venv);
let venv_path = std::path::PathBuf::from(venv);
if venv_path.exists() {
let venv_path = venv_path.canonicalize().unwrap_or(venv_path);
info!("Using VIRTUAL_ENV: {:?}", venv_path);
self.scan_venv_site_packages(&venv_path);
return;
} else {
warn!("VIRTUAL_ENV path does not exist: {:?}", venv_path);
}
} else {
debug!("No VIRTUAL_ENV environment variable set");
}
warn!("No virtual environment found - third-party fixtures will not be available");
}
fn scan_venv_site_packages(&self, venv_path: &Path) {
info!("Scanning venv site-packages in: {:?}", venv_path);
if try_init_stdlib_from_python(venv_path) {
info!("stdlib module list populated from venv Python");
} else {
info!(
"using built-in stdlib module list \
(Python < 3.10 or binary not found in {:?})",
venv_path
);
}
let lib_path = venv_path.join("lib");
debug!("Checking lib path: {:?}", lib_path);
if lib_path.exists() {
if let Ok(entries) = std::fs::read_dir(&lib_path) {
for entry in entries.flatten() {
let path = entry.path();
let dirname = path.file_name().unwrap_or_default().to_string_lossy();
debug!("Found in lib: {:?}", dirname);
if path.is_dir() && dirname.starts_with("python") {
let site_packages = path.join("site-packages");
debug!("Checking site-packages: {:?}", site_packages);
if site_packages.exists() {
let site_packages =
site_packages.canonicalize().unwrap_or(site_packages);
info!("Found site-packages: {:?}", site_packages);
self.site_packages_paths
.lock()
.unwrap()
.push(site_packages.clone());
self.scan_pytest_plugins(&site_packages);
return;
}
}
}
}
}
let windows_site_packages = venv_path.join("Lib/site-packages");
debug!("Checking Windows path: {:?}", windows_site_packages);
if windows_site_packages.exists() {
let windows_site_packages = windows_site_packages
.canonicalize()
.unwrap_or(windows_site_packages);
info!("Found site-packages (Windows): {:?}", windows_site_packages);
self.site_packages_paths
.lock()
.unwrap()
.push(windows_site_packages.clone());
self.scan_pytest_plugins(&windows_site_packages);
return;
}
warn!("Could not find site-packages in venv: {:?}", venv_path);
}
fn parse_pytest11_entry_points(content: &str) -> Vec<Pytest11EntryPoint> {
let mut results = Vec::new();
let mut in_pytest11_section = false;
for line in content.lines() {
let line = line.trim();
if line.starts_with('[') && line.ends_with(']') {
in_pytest11_section = line == "[pytest11]";
continue;
}
if in_pytest11_section && !line.is_empty() && !line.starts_with('#') {
if let Some((name, module_path)) = line.split_once('=') {
results.push(Pytest11EntryPoint {
name: name.trim().to_string(),
module_path: module_path.trim().to_string(),
});
}
}
}
results
}
fn resolve_entry_point_module_to_path(
site_packages: &Path,
module_path: &str,
) -> Option<PathBuf> {
let module_path = module_path.split(':').next().unwrap_or(module_path);
let parts: Vec<&str> = module_path.split('.').collect();
if parts.is_empty() {
return None;
}
if parts
.iter()
.any(|p| p.contains("..") || p.contains('\0') || p.is_empty())
{
return None;
}
let mut path = site_packages.to_path_buf();
for part in &parts {
path.push(part);
}
let check_bounded = |candidate: &Path| -> Option<PathBuf> {
let canonical = candidate.canonicalize().ok()?;
let base_canonical = site_packages.canonicalize().ok()?;
if canonical.starts_with(&base_canonical) {
Some(canonical)
} else {
None
}
};
let py_file = path.with_extension("py");
if py_file.exists() {
return check_bounded(&py_file);
}
if path.is_dir() {
let init_file = path.join("__init__.py");
if init_file.exists() {
return check_bounded(&init_file);
}
}
None
}
fn scan_single_plugin_file(&self, file_path: &Path) {
if file_path.extension().and_then(|s| s.to_str()) != Some("py") {
return;
}
debug!("Scanning plugin file: {:?}", file_path);
let canonical = file_path
.canonicalize()
.unwrap_or_else(|_| file_path.to_path_buf());
self.plugin_fixture_files.insert(canonical, ());
if let Ok(content) = std::fs::read_to_string(file_path) {
self.analyze_file(file_path.to_path_buf(), &content);
}
}
fn load_plugin_from_entry_point(&self, dist_info_path: &Path, site_packages: &Path) -> usize {
let entry_points_file = dist_info_path.join("entry_points.txt");
let content = match std::fs::read_to_string(&entry_points_file) {
Ok(c) => c,
Err(_) => return 0, };
let entries = Self::parse_pytest11_entry_points(&content);
if entries.is_empty() {
return 0; }
let mut scanned_count = 0;
for entry in entries {
debug!(
"Found pytest11 entry: {} = {}",
entry.name, entry.module_path
);
let resolved =
Self::resolve_entry_point_module_to_path(site_packages, &entry.module_path)
.or_else(|| self.resolve_entry_point_in_editable_installs(&entry.module_path));
if let Some(path) = resolved {
let scanned = if path.file_name().and_then(|n| n.to_str()) == Some("__init__.py") {
let package_dir = path.parent().expect("__init__.py must have parent");
info!(
"Scanning pytest plugin package directory for {}: {:?}",
entry.name, package_dir
);
self.scan_plugin_directory(package_dir);
true
} else if path.is_file() {
info!("Scanning pytest plugin: {} -> {:?}", entry.name, path);
self.scan_single_plugin_file(&path);
true
} else {
debug!(
"Resolved module path for plugin {} is not a file: {:?}",
entry.name, path
);
false
};
if scanned {
scanned_count += 1;
}
} else {
debug!(
"Could not resolve module path: {} for plugin {}",
entry.module_path, entry.name
);
}
}
scanned_count
}
fn scan_pytest_internal_fixtures(&self, site_packages: &Path) {
let pytest_internal = site_packages.join("_pytest");
if !pytest_internal.exists() || !pytest_internal.is_dir() {
debug!("_pytest directory not found in site-packages");
return;
}
info!(
"Scanning pytest internal fixtures in: {:?}",
pytest_internal
);
self.scan_plugin_directory(&pytest_internal);
self.register_request_builtin_fixture(&pytest_internal);
}
fn register_request_builtin_fixture(&self, pytest_internal: &Path) {
let fixtures_py = pytest_internal.join("fixtures.py");
let file_path = if fixtures_py.exists() {
fixtures_py
.canonicalize()
.unwrap_or_else(|_| fixtures_py.clone())
} else {
pytest_internal.join("_pytest_request_builtin.py")
};
if let Some(existing) = self.definitions.get("request") {
if existing.iter().any(|d| d.file_path == file_path) {
debug!(
"Synthetic 'request' fixture already registered for {:?}, skipping",
file_path
);
return;
}
}
drop(self.definitions.remove("request"));
let docstring = concat!(
"Special fixture providing information about the requesting test context.\n",
"\n",
"See https://docs.pytest.org/en/stable/reference/reference.html#request"
);
let definition = FixtureDefinition {
name: "request".to_string(),
file_path,
line: 1,
end_line: 1,
start_char: 0,
end_char: "request".len(),
docstring: Some(docstring.to_string()),
return_type: Some("FixtureRequest".to_string()),
return_type_imports: vec![TypeImportSpec {
check_name: "FixtureRequest".to_string(),
import_statement: "from pytest import FixtureRequest".to_string(),
}],
is_third_party: true,
is_plugin: true,
dependencies: vec![],
scope: FixtureScope::Function,
yield_line: None,
autouse: false,
};
info!("Registering synthetic 'request' fixture definition");
self.record_fixture_definition(definition);
}
fn extract_package_name_from_dist_info(dir_name: &str) -> Option<(String, String)> {
let name_version = dir_name
.strip_suffix(".dist-info")
.or_else(|| dir_name.strip_suffix(".egg-info"))?;
let name = if let Some(idx) = name_version.char_indices().position(|(i, c)| {
c == '-' && name_version[i + 1..].starts_with(|c: char| c.is_ascii_digit())
}) {
&name_version[..idx]
} else {
name_version
};
let raw = name.to_string();
let normalized = name.replace(['-', '.'], "_").to_lowercase();
Some((raw, normalized))
}
fn discover_editable_installs(&self, site_packages: &Path) {
info!("Scanning for editable installs in: {:?}", site_packages);
if !site_packages.is_dir() {
warn!(
"site-packages path is not a directory, skipping editable install scan: {:?}",
site_packages
);
return;
}
self.editable_install_roots.lock().unwrap().clear();
let pth_index = Self::build_pth_index(site_packages);
let entries = match std::fs::read_dir(site_packages) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
let filename = path.file_name().unwrap_or_default().to_string_lossy();
if !filename.ends_with(".dist-info") {
continue;
}
let direct_url_path = path.join("direct_url.json");
let content = match std::fs::read_to_string(&direct_url_path) {
Ok(c) => c,
Err(_) => continue,
};
let json: serde_json::Value = match serde_json::from_str(&content) {
Ok(v) => v,
Err(_) => continue,
};
let is_editable = json
.get("dir_info")
.and_then(|d| d.get("editable"))
.and_then(|e| e.as_bool())
.unwrap_or(false);
if !is_editable {
continue;
}
let Some((raw_name, normalized_name)) =
Self::extract_package_name_from_dist_info(&filename)
else {
continue;
};
let source_root = Self::find_editable_pth_source_root(
&pth_index,
&raw_name,
&normalized_name,
site_packages,
);
let Some(source_root) = source_root else {
debug!(
"No .pth file found for editable install: {}",
normalized_name
);
continue;
};
info!(
"Discovered editable install: {} -> {:?}",
normalized_name, source_root
);
self.editable_install_roots
.lock()
.unwrap()
.push(super::EditableInstall {
package_name: normalized_name,
raw_package_name: raw_name,
source_root,
site_packages: site_packages.to_path_buf(),
});
}
let count = self.editable_install_roots.lock().unwrap().len();
info!("Discovered {} editable install(s)", count);
}
fn build_pth_index(site_packages: &Path) -> std::collections::HashMap<String, PathBuf> {
let mut index = std::collections::HashMap::new();
if !site_packages.is_dir() {
return index;
}
let entries = match std::fs::read_dir(site_packages) {
Ok(e) => e,
Err(_) => return index,
};
for entry in entries.flatten() {
let fname = entry.file_name();
let fname_str = fname.to_string_lossy();
if fname_str.ends_with(".pth") {
let stem = fname_str.strip_suffix(".pth").unwrap_or(&fname_str);
index.insert(stem.to_string(), entry.path());
}
}
index
}
fn find_editable_pth_source_root(
pth_index: &std::collections::HashMap<String, PathBuf>,
raw_name: &str,
normalized_name: &str,
site_packages: &Path,
) -> Option<PathBuf> {
let mut candidates: Vec<String> = vec![
format!("__editable__.{}", normalized_name),
format!("_{}", normalized_name),
normalized_name.to_string(),
];
if raw_name != normalized_name {
candidates.push(format!("__editable__.{}", raw_name));
candidates.push(format!("_{}", raw_name));
candidates.push(raw_name.to_string());
}
for (stem, pth_path) in pth_index {
let matches = candidates.iter().any(|c| {
stem == c
|| stem.strip_prefix(c).is_some_and(|rest| {
rest.starts_with('-')
&& rest[1..].starts_with(|ch: char| ch.is_ascii_digit())
})
});
if !matches {
continue;
}
let content = match std::fs::read_to_string(pth_path) {
Ok(c) => c,
Err(_) => continue,
};
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') || line.starts_with("import ") {
continue;
}
if line.contains('\0')
|| line.bytes().any(|b| b < 0x20 && b != b'\t')
|| line.contains("..")
{
debug!("Skipping .pth line with invalid characters: {:?}", line);
continue;
}
let candidate = PathBuf::from(line);
let resolved = if candidate.is_absolute() {
candidate
} else {
site_packages.join(&candidate)
};
match resolved.canonicalize() {
Ok(canonical) if canonical.is_dir() => return Some(canonical),
Ok(canonical) => {
debug!(".pth path is not a directory: {:?}", canonical);
continue;
}
Err(_) => {
debug!("Could not canonicalize .pth path: {:?}", resolved);
continue;
}
}
}
}
None
}
fn resolve_entry_point_in_editable_installs(&self, module_path: &str) -> Option<PathBuf> {
let installs = self.editable_install_roots.lock().unwrap();
for install in installs.iter() {
if let Some(path) =
Self::resolve_entry_point_module_to_path(&install.source_root, module_path)
{
return Some(path);
}
}
None
}
fn scan_pytest_plugins(&self, site_packages: &Path) {
info!(
"Scanning for pytest plugins via entry points in: {:?}",
site_packages
);
self.discover_editable_installs(site_packages);
let mut plugin_count = 0;
self.scan_pytest_internal_fixtures(site_packages);
for entry in std::fs::read_dir(site_packages).into_iter().flatten() {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
let filename = path.file_name().unwrap_or_default().to_string_lossy();
if !filename.ends_with(".dist-info") && !filename.ends_with(".egg-info") {
continue;
}
let scanned = self.load_plugin_from_entry_point(&path, site_packages);
if scanned > 0 {
plugin_count += scanned;
debug!("Loaded {} plugin module(s) from {}", scanned, filename);
}
}
info!(
"Discovered fixtures from {} pytest plugin modules",
plugin_count
);
}
fn scan_plugin_directory(&self, plugin_dir: &Path) {
for entry in WalkDir::new(plugin_dir)
.max_depth(3) .into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("py") {
if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
if filename.starts_with("test_") || filename.contains("__pycache__") {
continue;
}
debug!("Scanning plugin file: {:?}", path);
let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf());
self.plugin_fixture_files.insert(canonical, ());
if let Ok(content) = std::fs::read_to_string(path) {
self.analyze_file(path.to_path_buf(), &content);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use tempfile::tempdir;
#[test]
fn test_parse_pytest11_entry_points_basic() {
let content = r#"
[console_scripts]
my-cli = my_package:main
[pytest11]
my_plugin = my_package.plugin
another = another_pkg
[other_section]
foo = bar
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].name, "my_plugin");
assert_eq!(entries[0].module_path, "my_package.plugin");
assert_eq!(entries[1].name, "another");
assert_eq!(entries[1].module_path, "another_pkg");
}
#[test]
fn test_parse_pytest11_entry_points_empty_file() {
let entries = FixtureDatabase::parse_pytest11_entry_points("");
assert!(entries.is_empty());
}
#[test]
fn test_parse_pytest11_entry_points_no_pytest11_section() {
let content = r#"
[console_scripts]
my-cli = my_package:main
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert!(entries.is_empty());
}
#[test]
fn test_parse_pytest11_entry_points_with_comments() {
let content = r#"
[pytest11]
# This is a comment
my_plugin = my_package.plugin
# Another comment
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].name, "my_plugin");
}
#[test]
fn test_parse_pytest11_entry_points_with_whitespace() {
let content = r#"
[pytest11]
my_plugin = my_package.plugin
another=another_pkg
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].name, "my_plugin");
assert_eq!(entries[0].module_path, "my_package.plugin");
assert_eq!(entries[1].name, "another");
assert_eq!(entries[1].module_path, "another_pkg");
}
#[test]
fn test_parse_pytest11_entry_points_with_attr() {
let content = r#"
[pytest11]
my_plugin = my_package.module:plugin_entry
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].module_path, "my_package.module:plugin_entry");
}
#[test]
fn test_parse_pytest11_entry_points_multiple_sections_before_pytest11() {
let content = r#"
[console_scripts]
cli = pkg:main
[gui_scripts]
gui = pkg:gui_main
[pytest11]
my_plugin = my_package.plugin
[other]
extra = something
"#;
let entries = FixtureDatabase::parse_pytest11_entry_points(content);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].name, "my_plugin");
}
#[test]
fn test_resolve_entry_point_module_to_path_package() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("my_plugin");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "# plugin code").unwrap();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "my_plugin");
assert!(result.is_some());
assert_eq!(
result.unwrap(),
pkg_dir.join("__init__.py").canonicalize().unwrap()
);
}
#[test]
fn test_resolve_entry_point_module_to_path_submodule() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("my_plugin");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
fs::write(pkg_dir.join("plugin.py"), "# plugin code").unwrap();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "my_plugin.plugin");
assert!(result.is_some());
assert_eq!(
result.unwrap(),
pkg_dir.join("plugin.py").canonicalize().unwrap()
);
}
#[test]
fn test_resolve_entry_point_module_to_path_single_file() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
fs::write(site_packages.join("my_plugin.py"), "# plugin code").unwrap();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "my_plugin");
assert!(result.is_some());
assert_eq!(
result.unwrap(),
site_packages.join("my_plugin.py").canonicalize().unwrap()
);
}
#[test]
fn test_resolve_entry_point_module_to_path_not_found() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let result = FixtureDatabase::resolve_entry_point_module_to_path(
site_packages,
"nonexistent_plugin",
);
assert!(result.is_none());
}
#[test]
fn test_resolve_entry_point_module_strips_attr() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("my_plugin");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
fs::write(pkg_dir.join("module.py"), "# plugin code").unwrap();
let result = FixtureDatabase::resolve_entry_point_module_to_path(
site_packages,
"my_plugin.module:entry_function",
);
assert!(result.is_some());
assert_eq!(
result.unwrap(),
pkg_dir.join("module.py").canonicalize().unwrap()
);
}
#[test]
fn test_resolve_entry_point_rejects_path_traversal() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
fs::write(site_packages.join("valid.py"), "# code").unwrap();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "..%2Fetc%2Fpasswd");
assert!(result.is_none(), "should reject traversal-like pattern");
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "valid...secret");
assert!(
result.is_none(),
"should reject module names with consecutive dots (empty segments)"
);
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "pkg..evil");
assert!(
result.is_none(),
"should reject module names with consecutive dots"
);
}
#[test]
fn test_resolve_entry_point_rejects_null_bytes() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "module\0name");
assert!(result.is_none(), "should reject null bytes");
}
#[test]
fn test_resolve_entry_point_rejects_empty_segments() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let result = FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "foo..bar");
assert!(result.is_none(), "should reject empty path segments");
}
#[cfg(unix)]
#[test]
fn test_resolve_entry_point_rejects_symlink_escape() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let outside = tempdir().unwrap();
fs::write(outside.path().join("evil.py"), "# malicious").unwrap();
std::os::unix::fs::symlink(outside.path(), site_packages.join("escaped")).unwrap();
let result =
FixtureDatabase::resolve_entry_point_module_to_path(site_packages, "escaped.evil");
assert!(
result.is_none(),
"should reject paths that escape site-packages via symlink"
);
}
#[test]
fn test_entry_point_plugin_discovery_integration() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let plugin_dir = site_packages.join("my_pytest_plugin");
fs::create_dir_all(&plugin_dir).unwrap();
let plugin_content = r#"
import pytest
@pytest.fixture
def my_dynamic_fixture():
"""A fixture discovered via entry points."""
return "discovered via entry point"
@pytest.fixture
def another_dynamic_fixture():
return 42
"#;
fs::write(plugin_dir.join("__init__.py"), plugin_content).unwrap();
let dist_info = site_packages.join("my_pytest_plugin-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let entry_points = "[pytest11]\nmy_plugin = my_pytest_plugin\n";
fs::write(dist_info.join("entry_points.txt"), entry_points).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("my_dynamic_fixture"),
"my_dynamic_fixture should be discovered"
);
assert!(
db.definitions.contains_key("another_dynamic_fixture"),
"another_dynamic_fixture should be discovered"
);
}
#[test]
fn test_entry_point_discovery_submodule() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let plugin_dir = site_packages.join("my_pytest_plugin");
fs::create_dir_all(&plugin_dir).unwrap();
fs::write(plugin_dir.join("__init__.py"), "# main init").unwrap();
let plugin_content = r#"
import pytest
@pytest.fixture
def submodule_fixture():
return "from submodule"
"#;
fs::write(plugin_dir.join("plugin.py"), plugin_content).unwrap();
let dist_info = site_packages.join("my_pytest_plugin-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let entry_points = "[pytest11]\nmy_plugin = my_pytest_plugin.plugin\n";
fs::write(dist_info.join("entry_points.txt"), entry_points).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("submodule_fixture"),
"submodule_fixture should be discovered"
);
}
#[test]
fn test_entry_point_discovery_package_scans_submodules() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let plugin_dir = site_packages.join("my_pytest_plugin");
fs::create_dir_all(&plugin_dir).unwrap();
fs::write(plugin_dir.join("__init__.py"), "# package init").unwrap();
let plugin_content = r#"
import pytest
@pytest.fixture
def package_submodule_fixture():
return "from package submodule"
"#;
fs::write(plugin_dir.join("fixtures.py"), plugin_content).unwrap();
let dist_info = site_packages.join("my_pytest_plugin-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let entry_points = "[pytest11]\nmy_plugin = my_pytest_plugin\n";
fs::write(dist_info.join("entry_points.txt"), entry_points).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("package_submodule_fixture"),
"package_submodule_fixture should be discovered"
);
}
#[test]
fn test_entry_point_discovery_no_pytest11_section() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("some_package");
fs::create_dir_all(&pkg_dir).unwrap();
let pkg_content = r#"
import pytest
@pytest.fixture
def should_not_be_found():
return "this package is not a pytest plugin"
"#;
fs::write(pkg_dir.join("__init__.py"), pkg_content).unwrap();
let dist_info = site_packages.join("some_package-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let entry_points = "[console_scripts]\nsome_cli = some_package:main\n";
fs::write(dist_info.join("entry_points.txt"), entry_points).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
!db.definitions.contains_key("should_not_be_found"),
"should_not_be_found should NOT be discovered (not a pytest plugin)"
);
}
#[test]
fn test_entry_point_discovery_missing_entry_points_txt() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("some_package");
fs::create_dir_all(&pkg_dir).unwrap();
let pkg_content = r#"
import pytest
@pytest.fixture
def should_not_be_found():
return "no entry_points.txt"
"#;
fs::write(pkg_dir.join("__init__.py"), pkg_content).unwrap();
let dist_info = site_packages.join("some_package-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
!db.definitions.contains_key("should_not_be_found"),
"should_not_be_found should NOT be discovered (no entry_points.txt)"
);
}
#[test]
fn test_entry_point_discovery_egg_info() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pkg_dir = site_packages.join("legacy_plugin");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(
pkg_dir.join("__init__.py"),
r#"
import pytest
@pytest.fixture
def legacy_plugin_fixture():
return "from egg-info"
"#,
)
.unwrap();
let egg_info = site_packages.join("legacy_plugin-1.0.0.egg-info");
fs::create_dir_all(&egg_info).unwrap();
let entry_points = "[pytest11]\nlegacy_plugin = legacy_plugin\n";
fs::write(egg_info.join("entry_points.txt"), entry_points).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("legacy_plugin_fixture"),
"legacy_plugin_fixture should be discovered"
);
}
#[test]
fn test_entry_point_discovery_multiple_plugins() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let plugin1_dir = site_packages.join("plugin_one");
fs::create_dir_all(&plugin1_dir).unwrap();
fs::write(
plugin1_dir.join("__init__.py"),
r#"
import pytest
@pytest.fixture
def fixture_from_plugin_one():
return 1
"#,
)
.unwrap();
let dist_info1 = site_packages.join("plugin_one-1.0.0.dist-info");
fs::create_dir_all(&dist_info1).unwrap();
fs::write(
dist_info1.join("entry_points.txt"),
"[pytest11]\nplugin_one = plugin_one\n",
)
.unwrap();
let plugin2_dir = site_packages.join("plugin_two");
fs::create_dir_all(&plugin2_dir).unwrap();
fs::write(
plugin2_dir.join("__init__.py"),
r#"
import pytest
@pytest.fixture
def fixture_from_plugin_two():
return 2
"#,
)
.unwrap();
let dist_info2 = site_packages.join("plugin_two-2.0.0.dist-info");
fs::create_dir_all(&dist_info2).unwrap();
fs::write(
dist_info2.join("entry_points.txt"),
"[pytest11]\nplugin_two = plugin_two\n",
)
.unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("fixture_from_plugin_one"),
"fixture_from_plugin_one should be discovered"
);
assert!(
db.definitions.contains_key("fixture_from_plugin_two"),
"fixture_from_plugin_two should be discovered"
);
}
#[test]
fn test_entry_point_discovery_multiple_entries_in_one_package() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let plugin_dir = site_packages.join("multi_plugin");
fs::create_dir_all(&plugin_dir).unwrap();
fs::write(plugin_dir.join("__init__.py"), "").unwrap();
fs::write(
plugin_dir.join("fixtures_a.py"),
r#"
import pytest
@pytest.fixture
def fixture_a():
return "A"
"#,
)
.unwrap();
fs::write(
plugin_dir.join("fixtures_b.py"),
r#"
import pytest
@pytest.fixture
def fixture_b():
return "B"
"#,
)
.unwrap();
let dist_info = site_packages.join("multi_plugin-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
fs::write(
dist_info.join("entry_points.txt"),
r#"[pytest11]
fixtures_a = multi_plugin.fixtures_a
fixtures_b = multi_plugin.fixtures_b
"#,
)
.unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("fixture_a"),
"fixture_a should be discovered"
);
assert!(
db.definitions.contains_key("fixture_b"),
"fixture_b should be discovered"
);
}
#[test]
fn test_pytest_internal_fixtures_scanned() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let pytest_internal = site_packages.join("_pytest");
fs::create_dir_all(&pytest_internal).unwrap();
let internal_fixtures = r#"
import pytest
@pytest.fixture
def tmp_path():
"""Pytest's built-in tmp_path fixture."""
pass
@pytest.fixture
def capsys():
"""Pytest's built-in capsys fixture."""
pass
"#;
fs::write(pytest_internal.join("fixtures.py"), internal_fixtures).unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("tmp_path"),
"tmp_path should be discovered from _pytest"
);
assert!(
db.definitions.contains_key("capsys"),
"capsys should be discovered from _pytest"
);
}
#[test]
fn test_extract_package_name_from_dist_info() {
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("mypackage-1.0.0.dist-info"),
Some(("mypackage".to_string(), "mypackage".to_string()))
);
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("my-package-1.0.0.dist-info"),
Some(("my-package".to_string(), "my_package".to_string()))
);
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("My.Package-2.3.4.dist-info"),
Some(("My.Package".to_string(), "my_package".to_string()))
);
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("pytest_mock-3.12.0.dist-info"),
Some(("pytest_mock".to_string(), "pytest_mock".to_string()))
);
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("mypackage-0.1.0.egg-info"),
Some(("mypackage".to_string(), "mypackage".to_string()))
);
assert_eq!(
FixtureDatabase::extract_package_name_from_dist_info("mypackage.dist-info"),
Some(("mypackage".to_string(), "mypackage".to_string()))
);
}
#[test]
fn test_discover_editable_installs() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
let pkg_dir = source_root.path().join("mypackage");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let dist_info = site_packages.join("mypackage-0.1.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": {
"editable": true
}
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(
site_packages.join("__editable__.mypackage-0.1.0.pth"),
&pth_content,
)
.unwrap();
let db = FixtureDatabase::new();
db.discover_editable_installs(site_packages);
let installs = db.editable_install_roots.lock().unwrap();
assert_eq!(installs.len(), 1, "Should discover one editable install");
assert_eq!(installs[0].package_name, "mypackage");
assert_eq!(
installs[0].source_root,
source_root.path().canonicalize().unwrap()
);
}
#[test]
fn test_discover_editable_installs_pth_with_dashes() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
let pkg_dir = source_root.path().join("my_package");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let dist_info = site_packages.join("my-package-0.1.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": { "editable": true }
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(
site_packages.join("__editable__.my-package-0.1.0.pth"),
&pth_content,
)
.unwrap();
let db = FixtureDatabase::new();
db.discover_editable_installs(site_packages);
let installs = db.editable_install_roots.lock().unwrap();
assert_eq!(
installs.len(),
1,
"Should discover editable install from .pth with dashes"
);
assert_eq!(installs[0].package_name, "my_package");
assert_eq!(
installs[0].source_root,
source_root.path().canonicalize().unwrap()
);
}
#[test]
fn test_discover_editable_installs_pth_with_dots() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
fs::create_dir_all(source_root.path().join("my_package")).unwrap();
fs::write(source_root.path().join("my_package/__init__.py"), "").unwrap();
let dist_info = site_packages.join("My.Package-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": { "editable": true }
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(
site_packages.join("__editable__.My.Package-1.0.0.pth"),
&pth_content,
)
.unwrap();
let db = FixtureDatabase::new();
db.discover_editable_installs(site_packages);
let installs = db.editable_install_roots.lock().unwrap();
assert_eq!(
installs.len(),
1,
"Should discover editable install from .pth with dots"
);
assert_eq!(installs[0].package_name, "my_package");
assert_eq!(
installs[0].source_root,
source_root.path().canonicalize().unwrap()
);
}
#[test]
fn test_discover_editable_installs_dedup_on_rescan() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
fs::create_dir_all(source_root.path().join("pkg")).unwrap();
fs::write(source_root.path().join("pkg/__init__.py"), "").unwrap();
let dist_info = site_packages.join("pkg-0.1.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": { "editable": true }
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(site_packages.join("pkg.pth"), &pth_content).unwrap();
let db = FixtureDatabase::new();
db.discover_editable_installs(site_packages);
db.discover_editable_installs(site_packages);
let installs = db.editable_install_roots.lock().unwrap();
assert_eq!(
installs.len(),
1,
"Re-scanning should not produce duplicates"
);
}
#[test]
fn test_editable_install_entry_point_resolution() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
let pkg_dir = source_root.path().join("mypackage");
fs::create_dir_all(&pkg_dir).unwrap();
let plugin_content = r#"
import pytest
@pytest.fixture
def editable_fixture():
return "from editable install"
"#;
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
fs::write(pkg_dir.join("plugin.py"), plugin_content).unwrap();
let dist_info = site_packages.join("mypackage-0.1.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": { "editable": true }
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let entry_points = "[pytest11]\nmypackage = mypackage.plugin\n";
fs::write(dist_info.join("entry_points.txt"), entry_points).unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(
site_packages.join("__editable__.mypackage-0.1.0.pth"),
&pth_content,
)
.unwrap();
let db = FixtureDatabase::new();
db.scan_pytest_plugins(site_packages);
assert!(
db.definitions.contains_key("editable_fixture"),
"editable_fixture should be discovered via entry point fallback"
);
}
#[test]
fn test_discover_editable_installs_namespace_package() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root = tempdir().unwrap();
let pkg_dir = source_root.path().join("namespace").join("pkg");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let dist_info = site_packages.join("namespace.pkg-1.0.0.dist-info");
fs::create_dir_all(&dist_info).unwrap();
let direct_url = serde_json::json!({
"url": format!("file://{}", source_root.path().display()),
"dir_info": { "editable": true }
});
fs::write(
dist_info.join("direct_url.json"),
serde_json::to_string(&direct_url).unwrap(),
)
.unwrap();
let pth_content = format!("{}\n", source_root.path().display());
fs::write(
site_packages.join("__editable__.namespace.pkg-1.0.0.pth"),
&pth_content,
)
.unwrap();
let db = FixtureDatabase::new();
db.discover_editable_installs(site_packages);
let installs = db.editable_install_roots.lock().unwrap();
assert_eq!(
installs.len(),
1,
"Should discover namespace editable install"
);
assert_eq!(installs[0].package_name, "namespace_pkg");
assert_eq!(installs[0].raw_package_name, "namespace.pkg");
assert_eq!(
installs[0].source_root,
source_root.path().canonicalize().unwrap()
);
}
#[test]
fn test_pth_prefix_matching_no_false_positive() {
let temp = tempdir().unwrap();
let site_packages = temp.path();
let source_root_foo = tempdir().unwrap();
fs::create_dir_all(source_root_foo.path()).unwrap();
let source_root_foobar = tempdir().unwrap();
fs::create_dir_all(source_root_foobar.path()).unwrap();
fs::write(
site_packages.join("foo-bar.pth"),
format!("{}\n", source_root_foobar.path().display()),
)
.unwrap();
let pth_index = FixtureDatabase::build_pth_index(site_packages);
let result =
FixtureDatabase::find_editable_pth_source_root(&pth_index, "foo", "foo", site_packages);
assert!(
result.is_none(),
"foo should not match foo-bar.pth (different package)"
);
let result = FixtureDatabase::find_editable_pth_source_root(
&pth_index,
"foo-bar",
"foo_bar",
site_packages,
);
assert!(result.is_some(), "foo-bar should match foo-bar.pth exactly");
}
#[test]
fn test_transitive_plugin_status_via_pytest_plugins() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let pkg_dir = workspace_canonical.join("mypackage");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let plugin_content = r#"
import pytest
pytest_plugins = ["mypackage.helpers"]
@pytest.fixture
def direct_plugin_fixture():
return "from plugin.py"
"#;
let plugin_file = pkg_dir.join("plugin.py");
fs::write(&plugin_file, plugin_content).unwrap();
let helpers_content = r#"
import pytest
@pytest.fixture
def transitive_plugin_fixture():
return "from helpers.py, imported by plugin.py"
"#;
let helpers_file = pkg_dir.join("helpers.py");
fs::write(&helpers_file, helpers_content).unwrap();
let canonical_plugin = plugin_file.canonicalize().unwrap();
db.plugin_fixture_files.insert(canonical_plugin.clone(), ());
db.analyze_file(canonical_plugin.clone(), plugin_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let direct_is_plugin = db
.definitions
.get("direct_plugin_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
direct_is_plugin,
Some(true),
"direct_plugin_fixture should have is_plugin=true"
);
let transitive_is_plugin = db
.definitions
.get("transitive_plugin_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
transitive_is_plugin,
Some(true),
"transitive_plugin_fixture should have is_plugin=true (propagated from plugin.py)"
);
let transitive_is_third_party = db
.definitions
.get("transitive_plugin_fixture")
.map(|defs| defs[0].is_third_party);
assert_eq!(
transitive_is_third_party,
Some(false),
"transitive_plugin_fixture should NOT be third-party (workspace-local)"
);
let tests_dir = workspace_canonical.join("tests");
fs::create_dir_all(&tests_dir).unwrap();
let test_file = tests_dir.join("test_transitive.py");
let test_content = "def test_transitive(): pass\n";
fs::write(&test_file, test_content).unwrap();
let canonical_test = test_file.canonicalize().unwrap();
db.analyze_file(canonical_test.clone(), test_content);
let available = db.get_available_fixtures(&canonical_test);
let available_names: Vec<&str> = available.iter().map(|d| d.name.as_str()).collect();
assert!(
available_names.contains(&"direct_plugin_fixture"),
"direct_plugin_fixture should be available. Got: {:?}",
available_names
);
assert!(
available_names.contains(&"transitive_plugin_fixture"),
"transitive_plugin_fixture should be available (transitively via plugin). Got: {:?}",
available_names
);
}
#[test]
fn test_transitive_plugin_status_via_star_import() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let pkg_dir = workspace_canonical.join("mypackage");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let plugin_content = r#"
import pytest
from .fixtures import *
@pytest.fixture
def star_direct_fixture():
return "from plugin.py"
"#;
let plugin_file = pkg_dir.join("plugin.py");
fs::write(&plugin_file, plugin_content).unwrap();
let fixtures_content = r#"
import pytest
@pytest.fixture
def star_imported_fixture():
return "from fixtures.py, star-imported by plugin.py"
"#;
let fixtures_file = pkg_dir.join("fixtures.py");
fs::write(&fixtures_file, fixtures_content).unwrap();
let canonical_plugin = plugin_file.canonicalize().unwrap();
db.plugin_fixture_files.insert(canonical_plugin.clone(), ());
db.analyze_file(canonical_plugin.clone(), plugin_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let star_is_plugin = db
.definitions
.get("star_imported_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
star_is_plugin,
Some(true),
"star_imported_fixture should have is_plugin=true (propagated from plugin.py via star import)"
);
let test_file = workspace_canonical.join("test_star.py");
let test_content = "def test_star(): pass\n";
fs::write(&test_file, test_content).unwrap();
let canonical_test = test_file.canonicalize().unwrap();
db.analyze_file(canonical_test.clone(), test_content);
let available = db.get_available_fixtures(&canonical_test);
let available_names: Vec<&str> = available.iter().map(|d| d.name.as_str()).collect();
assert!(
available_names.contains(&"star_direct_fixture"),
"star_direct_fixture should be available. Got: {:?}",
available_names
);
assert!(
available_names.contains(&"star_imported_fixture"),
"star_imported_fixture should be available (transitively via star import). Got: {:?}",
available_names
);
}
#[test]
fn test_non_plugin_conftest_import_not_marked_as_plugin() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let conftest_content = r#"
import pytest
from .helpers import *
"#;
let conftest_file = workspace_canonical.join("conftest.py");
fs::write(&conftest_file, conftest_content).unwrap();
let helpers_content = r#"
import pytest
@pytest.fixture
def conftest_helper_fixture():
return "from helpers, imported by conftest"
"#;
let helpers_file = workspace_canonical.join("helpers.py");
fs::write(&helpers_file, helpers_content).unwrap();
let canonical_conftest = conftest_file.canonicalize().unwrap();
db.analyze_file(canonical_conftest.clone(), conftest_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let is_plugin = db
.definitions
.get("conftest_helper_fixture")
.map(|defs| defs[0].is_plugin);
if let Some(is_plugin) = is_plugin {
assert!(
!is_plugin,
"Fixture imported by conftest (not a plugin) should NOT be marked is_plugin"
);
}
}
#[test]
fn test_already_cached_module_marked_plugin_via_pytest_plugins() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let pkg_dir = workspace_canonical.join("mypackage");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let helpers_content = r#"
import pytest
@pytest.fixture
def pre_cached_fixture():
return "I was analyzed before the plugin scan"
"#;
let helpers_file = pkg_dir.join("helpers.py");
fs::write(&helpers_file, helpers_content).unwrap();
let canonical_helpers = helpers_file.canonicalize().unwrap();
db.analyze_file(canonical_helpers.clone(), helpers_content);
let before = db
.definitions
.get("pre_cached_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
before,
Some(false),
"pre_cached_fixture should initially have is_plugin=false"
);
let plugin_content = r#"
import pytest
pytest_plugins = ["mypackage.helpers"]
@pytest.fixture
def direct_fixture():
return "from plugin.py"
"#;
let plugin_file = pkg_dir.join("plugin.py");
fs::write(&plugin_file, plugin_content).unwrap();
let canonical_plugin = plugin_file.canonicalize().unwrap();
db.plugin_fixture_files.insert(canonical_plugin.clone(), ());
db.analyze_file(canonical_plugin.clone(), plugin_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let after = db
.definitions
.get("pre_cached_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
after,
Some(true),
"pre_cached_fixture should have is_plugin=true after re-analysis \
(was already cached when plugin declared pytest_plugins)"
);
}
#[test]
fn test_already_cached_module_marked_plugin_via_star_import() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let pkg_dir = workspace_canonical.join("mypkg");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let fixtures_content = r#"
import pytest
@pytest.fixture
def star_pre_cached():
return "cached before plugin scan"
"#;
let fixtures_file = pkg_dir.join("fixtures.py");
fs::write(&fixtures_file, fixtures_content).unwrap();
let canonical_fixtures = fixtures_file.canonicalize().unwrap();
db.analyze_file(canonical_fixtures.clone(), fixtures_content);
let before = db
.definitions
.get("star_pre_cached")
.map(|defs| defs[0].is_plugin);
assert_eq!(before, Some(false), "should start as is_plugin=false");
let plugin_content = r#"
import pytest
from .fixtures import *
@pytest.fixture
def plugin_direct():
return "direct"
"#;
let plugin_file = pkg_dir.join("plugin.py");
fs::write(&plugin_file, plugin_content).unwrap();
let canonical_plugin = plugin_file.canonicalize().unwrap();
db.plugin_fixture_files.insert(canonical_plugin.clone(), ());
db.analyze_file(canonical_plugin.clone(), plugin_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let after = db
.definitions
.get("star_pre_cached")
.map(|defs| defs[0].is_plugin);
assert_eq!(
after,
Some(true),
"star_pre_cached should have is_plugin=true after re-analysis \
(module was star-imported by a plugin file)"
);
}
#[test]
fn test_explicit_import_does_not_propagate_plugin_status() {
let workspace = tempdir().unwrap();
let workspace_canonical = workspace.path().canonicalize().unwrap();
let db = FixtureDatabase::new();
*db.workspace_root.lock().unwrap() = Some(workspace_canonical.clone());
let pkg_dir = workspace_canonical.join("explpkg");
fs::create_dir_all(&pkg_dir).unwrap();
fs::write(pkg_dir.join("__init__.py"), "").unwrap();
let utils_content = r#"
import pytest
def helper_function():
return 42
@pytest.fixture
def utils_fixture():
return "from utils"
"#;
let utils_file = pkg_dir.join("utils.py");
fs::write(&utils_file, utils_content).unwrap();
let plugin_content = r#"
import pytest
from .utils import helper_function
@pytest.fixture
def explicit_plugin_fixture():
return helper_function()
"#;
let plugin_file = pkg_dir.join("plugin.py");
fs::write(&plugin_file, plugin_content).unwrap();
let canonical_plugin = plugin_file.canonicalize().unwrap();
db.plugin_fixture_files.insert(canonical_plugin.clone(), ());
db.analyze_file(canonical_plugin.clone(), plugin_content);
db.scan_imported_fixture_modules(&workspace_canonical);
let plugin_fixture = db
.definitions
.get("explicit_plugin_fixture")
.map(|defs| defs[0].is_plugin);
assert_eq!(
plugin_fixture,
Some(true),
"explicit_plugin_fixture should have is_plugin=true"
);
let utils_is_plugin = db
.definitions
.get("utils_fixture")
.map(|defs| defs[0].is_plugin);
if let Some(is_plugin) = utils_is_plugin {
assert!(
!is_plugin,
"utils_fixture should NOT be is_plugin — the plugin only did \
an explicit import of helper_function, not a star import"
);
}
}
}