use std::path::Path;
use serial_test::serial;
use socket_patch_core::crawlers::python_crawler::{
find_local_venv_site_packages, find_python_command_with, find_python_dirs,
get_global_python_site_packages, parse_python_site_packages_output, read_python_metadata,
};
use socket_patch_core::crawlers::types::CrawlerOptions;
use socket_patch_core::crawlers::PythonCrawler;
#[test]
fn parse_python_site_packages_output_well_formed() {
let stdout =
"/usr/local/lib/python3.11/site-packages\n/usr/local/lib/python3.11/dist-packages\n";
let paths = parse_python_site_packages_output(stdout);
assert_eq!(paths.len(), 2);
assert_eq!(
paths[0],
std::path::PathBuf::from("/usr/local/lib/python3.11/site-packages")
);
}
#[test]
fn parse_python_site_packages_output_empty_returns_empty() {
assert!(parse_python_site_packages_output("").is_empty());
assert!(parse_python_site_packages_output("\n \n").is_empty());
}
#[test]
fn parse_python_site_packages_output_trims_and_skips_blanks() {
let stdout = " /a/b \n\n \n/c/d\n";
let paths = parse_python_site_packages_output(stdout);
assert_eq!(paths.len(), 2);
assert_eq!(paths[0], std::path::PathBuf::from("/a/b"));
assert_eq!(paths[1], std::path::PathBuf::from("/c/d"));
}
#[test]
fn find_python_command_with_mock_runner_prefers_python3() {
let runner = common::MockCommandRunner::new().with_response(
"python3",
&["--version"],
Some("Python 3.11.5\n"),
);
assert_eq!(find_python_command_with(&runner), Some("python3"));
}
#[test]
fn find_python_command_with_mock_runner_falls_through_to_python() {
let runner = common::MockCommandRunner::new().with_response(
"python",
&["--version"],
Some("Python 2.7.18\n"),
);
assert_eq!(find_python_command_with(&runner), Some("python"));
}
#[test]
fn find_python_command_with_mock_runner_none_when_no_binary() {
let runner = common::MockCommandRunner::new();
assert_eq!(find_python_command_with(&runner), None);
}
async fn stage_python_layout(root: &Path, py_ver: &str) -> std::path::PathBuf {
let sp = root
.join(format!("python{py_ver}"))
.join("lib")
.join(format!("python{py_ver}"))
.join("site-packages");
tokio::fs::create_dir_all(&sp).await.unwrap();
sp
}
#[tokio::test]
async fn find_python_dirs_python3_wildcard_matches_versions() {
let tmp = tempfile::tempdir().unwrap();
let p1 = stage_python_layout(tmp.path(), "3.11").await;
let _p2 = stage_python_layout(tmp.path(), "3.12").await;
tokio::fs::create_dir_all(tmp.path().join("python2.7").join("lib"))
.await
.unwrap();
let result = find_python_dirs(
tmp.path(),
&["python3.*", "lib", "python3.*", "site-packages"],
)
.await;
assert!(
result.iter().any(|r| r == &p1),
"must find python3.11 layout; got {result:?}"
);
assert_eq!(result.len(), 2, "must find exactly python3.11 + python3.12");
}
#[tokio::test]
async fn find_python_dirs_star_wildcard_matches_all() {
let tmp = tempfile::tempdir().unwrap();
tokio::fs::create_dir_all(
tmp.path()
.join("pkg_a")
.join("lib")
.join("python3.11")
.join("site-packages"),
)
.await
.unwrap();
tokio::fs::create_dir_all(
tmp.path()
.join("pkg_b")
.join("lib")
.join("python3.11")
.join("site-packages"),
)
.await
.unwrap();
let result = find_python_dirs(tmp.path(), &["*", "lib", "python3.*", "site-packages"]).await;
assert_eq!(result.len(), 2, "* must match both pkg_a and pkg_b");
}
#[tokio::test]
async fn find_python_dirs_star_wildcard_skips_files() {
let tmp = tempfile::tempdir().unwrap();
tokio::fs::write(tmp.path().join("not_a_dir.txt"), b"x")
.await
.unwrap();
tokio::fs::create_dir_all(
tmp.path()
.join("real")
.join("lib")
.join("python3.11")
.join("site-packages"),
)
.await
.unwrap();
let result = find_python_dirs(tmp.path(), &["*", "lib", "python3.*", "site-packages"]).await;
assert_eq!(result.len(), 1, "regular file must be skipped");
}
#[tokio::test]
async fn find_python_dirs_nonexistent_base_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
let absent = tmp.path().join("does-not-exist");
let result = find_python_dirs(&absent, &["python3.*", "site-packages"]).await;
assert!(result.is_empty());
}
#[tokio::test]
async fn find_python_dirs_empty_segments_returns_base() {
let tmp = tempfile::tempdir().unwrap();
let result = find_python_dirs(tmp.path(), &[]).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0], tmp.path());
}
#[tokio::test]
async fn find_python_dirs_literal_segment_descends() {
let tmp = tempfile::tempdir().unwrap();
let target = tmp.path().join("literal_subdir").join("more");
tokio::fs::create_dir_all(&target).await.unwrap();
let result = find_python_dirs(tmp.path(), &["literal_subdir", "more"]).await;
assert_eq!(result.len(), 1);
assert_eq!(result[0], target);
}
fn venv_site_packages_relpath() -> std::path::PathBuf {
#[cfg(windows)]
{
std::path::Path::new("Lib").join("site-packages")
}
#[cfg(not(windows))]
{
std::path::Path::new("lib")
.join("python3.11")
.join("site-packages")
}
}
#[tokio::test]
#[serial]
async fn find_local_venv_site_packages_honors_virtual_env_var() {
let tmp = tempfile::tempdir().unwrap();
let venv = tmp.path().join("custom-venv");
let sp = venv.join(venv_site_packages_relpath());
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev = std::env::var("VIRTUAL_ENV").ok();
std::env::set_var("VIRTUAL_ENV", &venv);
let result = find_local_venv_site_packages(tmp.path()).await;
std::env::remove_var("VIRTUAL_ENV");
if let Some(v) = prev {
std::env::set_var("VIRTUAL_ENV", v);
}
assert!(
result.iter().any(|p| p == &sp),
"VIRTUAL_ENV path must surface; got {result:?}"
);
}
#[tokio::test]
#[serial]
async fn find_local_venv_site_packages_discovers_dot_venv() {
let tmp = tempfile::tempdir().unwrap();
let sp = tmp.path().join(".venv").join(venv_site_packages_relpath());
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev = std::env::var("VIRTUAL_ENV").ok();
std::env::remove_var("VIRTUAL_ENV");
let result = find_local_venv_site_packages(tmp.path()).await;
if let Some(v) = prev {
std::env::set_var("VIRTUAL_ENV", v);
}
assert!(
result.iter().any(|p| p == &sp),
".venv must be discovered; got {result:?}"
);
}
#[tokio::test]
#[serial]
async fn find_local_venv_site_packages_discovers_venv_dir() {
let tmp = tempfile::tempdir().unwrap();
let sp = tmp.path().join("venv").join(venv_site_packages_relpath());
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev = std::env::var("VIRTUAL_ENV").ok();
std::env::remove_var("VIRTUAL_ENV");
let result = find_local_venv_site_packages(tmp.path()).await;
if let Some(v) = prev {
std::env::set_var("VIRTUAL_ENV", v);
}
assert!(
result.iter().any(|p| p == &sp),
"venv must be discovered; got {result:?}"
);
}
#[tokio::test]
#[serial]
async fn get_global_python_site_packages_discovers_anaconda() {
let tmp = tempfile::tempdir().unwrap();
let anaconda_sp = tmp
.path()
.join("anaconda3")
.join("lib")
.join("python3.11")
.join("site-packages");
tokio::fs::create_dir_all(&anaconda_sp).await.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", tmp.path());
let result = get_global_python_site_packages().await;
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
assert!(
result.iter().any(|p| p == &anaconda_sp),
"staged anaconda path must surface; got {result:?}"
);
}
#[cfg(target_os = "macos")]
#[tokio::test]
#[serial]
async fn get_global_python_site_packages_discovers_uv_tools_macos() {
let tmp = tempfile::tempdir().unwrap();
let sp = tmp
.path()
.join("Library")
.join("Application Support")
.join("uv")
.join("tools")
.join("black")
.join("lib")
.join("python3.11")
.join("site-packages");
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", tmp.path());
let result = get_global_python_site_packages().await;
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
assert!(
result.iter().any(|p| p == &sp),
"uv tools layout must surface; got {result:?}"
);
}
#[cfg(all(not(target_os = "macos"), not(windows)))]
#[tokio::test]
#[serial]
async fn get_global_python_site_packages_discovers_uv_tools_linux() {
let tmp = tempfile::tempdir().unwrap();
let sp = tmp
.path()
.join(".local")
.join("share")
.join("uv")
.join("tools")
.join("black")
.join("lib")
.join("python3.11")
.join("site-packages");
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", tmp.path());
let result = get_global_python_site_packages().await;
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
assert!(
result.iter().any(|p| p == &sp),
"uv tools layout must surface; got {result:?}"
);
}
#[cfg(not(windows))]
#[tokio::test]
#[serial]
async fn get_global_python_site_packages_discovers_uv_python_install() {
let tmp = tempfile::tempdir().unwrap();
let sp = tmp
.path()
.join(".local")
.join("share")
.join("uv")
.join("python")
.join("cpython-3.11.6-macos-aarch64-none")
.join("lib")
.join("python3.11")
.join("site-packages");
tokio::fs::create_dir_all(&sp).await.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", tmp.path());
let result = get_global_python_site_packages().await;
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
assert!(
result.iter().any(|p| p == &sp),
"uv-python managed interpreter site-packages must surface; got {result:?}"
);
}
#[tokio::test]
#[serial]
async fn get_site_packages_paths_falls_back_via_pyproject_marker() {
let project = tempfile::tempdir().unwrap();
let home = tempfile::tempdir().unwrap();
tokio::fs::write(
project.path().join("pyproject.toml"),
b"[project]\nname = \"x\"\n",
)
.await
.unwrap();
#[cfg(target_os = "macos")]
let staged = home
.path()
.join("Library")
.join("Application Support")
.join("uv")
.join("tools")
.join("ruff")
.join("lib")
.join("python3.11")
.join("site-packages");
#[cfg(all(not(target_os = "macos"), not(windows)))]
let staged = home
.path()
.join(".local")
.join("share")
.join("uv")
.join("tools")
.join("ruff")
.join("lib")
.join("python3.11")
.join("site-packages");
#[cfg(windows)]
let staged = home.path().join("uv-fake-staged");
tokio::fs::create_dir_all(&staged).await.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", home.path());
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: project.path().to_path_buf(),
global: false,
global_prefix: None,
batch_size: 100,
};
let result = crawler.get_site_packages_paths(&opts).await.unwrap();
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
#[cfg(not(windows))]
assert!(
result.iter().any(|p| p == &staged),
"pyproject.toml marker must trigger global fallback; got {result:?}"
);
#[cfg(windows)]
let _ = result;
}
#[tokio::test]
#[serial]
async fn get_site_packages_paths_falls_back_via_uv_lock_marker() {
let project = tempfile::tempdir().unwrap();
let home = tempfile::tempdir().unwrap();
tokio::fs::write(project.path().join("uv.lock"), b"version = 1\n")
.await
.unwrap();
let prev_home = std::env::var("HOME").ok();
std::env::set_var("HOME", home.path());
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: project.path().to_path_buf(),
global: false,
global_prefix: None,
batch_size: 100,
};
let _ = crawler.get_site_packages_paths(&opts).await.unwrap();
if let Some(v) = prev_home {
std::env::set_var("HOME", v);
}
}
#[tokio::test]
#[serial]
async fn get_site_packages_paths_no_marker_no_venv_returns_empty() {
let project = tempfile::tempdir().unwrap();
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: project.path().to_path_buf(),
global: false,
global_prefix: None,
batch_size: 100,
};
let prev_virtual_env = std::env::var("VIRTUAL_ENV").ok();
std::env::remove_var("VIRTUAL_ENV");
let result = crawler.get_site_packages_paths(&opts).await.unwrap();
if let Some(v) = prev_virtual_env {
std::env::set_var("VIRTUAL_ENV", v);
}
assert!(
result.is_empty(),
"non-python project must produce zero paths; got {result:?}"
);
}
#[tokio::test]
async fn read_python_metadata_well_formed() {
let tmp = tempfile::tempdir().unwrap();
let dist_info = tmp.path().join("requests-2.28.0.dist-info");
tokio::fs::create_dir(&dist_info).await.unwrap();
tokio::fs::write(
dist_info.join("METADATA"),
"Metadata-Version: 2.1\nName: requests\nVersion: 2.28.0\n",
)
.await
.unwrap();
let result = read_python_metadata(&dist_info).await;
assert_eq!(result, Some(("requests".to_string(), "2.28.0".to_string())));
}
#[tokio::test]
async fn read_python_metadata_missing_file_falls_back_to_dir_name() {
let tmp = tempfile::tempdir().unwrap();
let dist_info = tmp.path().join("requests-2.28.0.dist-info");
tokio::fs::create_dir(&dist_info).await.unwrap();
let result = read_python_metadata(&dist_info).await;
assert_eq!(result, Some(("requests".to_string(), "2.28.0".to_string())));
}
#[tokio::test]
async fn read_python_metadata_missing_name_falls_back_to_dir_name() {
let tmp = tempfile::tempdir().unwrap();
let dist_info = tmp.path().join("requests-2.28.0.dist-info");
tokio::fs::create_dir(&dist_info).await.unwrap();
tokio::fs::write(
dist_info.join("METADATA"),
"Metadata-Version: 2.1\nVersion: 2.28.0\n",
)
.await
.unwrap();
let result = read_python_metadata(&dist_info).await;
assert_eq!(result, Some(("requests".to_string(), "2.28.0".to_string())));
}
#[path = "common/mod.rs"]
mod common;
#[cfg(unix)]
#[tokio::test]
async fn find_by_purls_handles_unreadable_site_packages() {
if common::uid_is_root() {
eprintln!("SKIP: chmod 000 is a no-op under root");
return;
}
let tmp = tempfile::tempdir().unwrap();
let site_packages = tmp.path().join("sp");
tokio::fs::create_dir(&site_packages).await.unwrap();
common::chmod_unreadable(&site_packages);
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(&site_packages, &["pkg:pypi/requests@2.28.0".to_string()])
.await
.unwrap();
common::chmod_readable(&site_packages);
assert!(result.is_empty());
}
#[cfg(unix)]
#[tokio::test]
async fn crawl_all_handles_unreadable_site_packages() {
if common::uid_is_root() {
eprintln!("SKIP: chmod 000 is a no-op under root");
return;
}
let tmp = tempfile::tempdir().unwrap();
let site_packages = tmp.path().join("sp");
tokio::fs::create_dir(&site_packages).await.unwrap();
common::chmod_unreadable(&site_packages);
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: tmp.path().to_path_buf(),
global: true,
global_prefix: Some(site_packages.clone()),
batch_size: 100,
};
let result = crawler.crawl_all(&opts).await;
common::chmod_readable(&site_packages);
assert!(result.is_empty());
}
#[test]
fn python_crawler_default_and_new_construct_cleanly() {
let _a = PythonCrawler::default();
let _b = PythonCrawler::new();
}
async fn stage_dist_info(site_packages: &Path, raw_name: &str, version: &str) {
let dist = site_packages.join(format!("{raw_name}-{version}.dist-info"));
tokio::fs::create_dir_all(&dist).await.unwrap();
let metadata = format!("Metadata-Version: 2.1\nName: {raw_name}\nVersion: {version}\n");
tokio::fs::write(dist.join("METADATA"), metadata)
.await
.unwrap();
}
#[tokio::test]
async fn find_by_purls_matches_canonicalized_name() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "Requests", "2.28.0").await;
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(tmp.path(), &["pkg:pypi/requests@2.28.0".to_string()])
.await
.unwrap();
assert_eq!(result.len(), 1, "canonical lookup must hit");
}
#[tokio::test]
async fn find_by_purls_strips_qualifiers() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "requests", "2.28.0").await;
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(
tmp.path(),
&["pkg:pypi/requests@2.28.0?extension=tar.gz".to_string()],
)
.await
.unwrap();
assert_eq!(result.len(), 1, "qualifiers must be stripped before lookup");
}
#[tokio::test]
async fn find_by_purls_empty_purls_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "requests", "2.28.0").await;
let crawler = PythonCrawler;
let result = crawler.find_by_purls(tmp.path(), &[]).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn find_by_purls_missing_site_packages_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(
&tmp.path().join("no-such-dir"),
&["pkg:pypi/requests@2.28.0".to_string()],
)
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn find_by_purls_invalid_purl_skipped() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "requests", "2.28.0").await;
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(tmp.path(), &["pkg:not-pypi/foo@1.0".to_string()])
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn find_by_purls_version_mismatch_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "requests", "2.28.0").await;
let crawler = PythonCrawler;
let result = crawler
.find_by_purls(tmp.path(), &["pkg:pypi/requests@99.99.99".to_string()])
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn crawl_all_via_site_packages_finds_dist_info_packages() {
let tmp = tempfile::tempdir().unwrap();
stage_dist_info(tmp.path(), "Requests", "2.28.0").await;
stage_dist_info(tmp.path(), "urllib3", "2.0.0").await;
tokio::fs::create_dir_all(tmp.path().join("ignore-me"))
.await
.unwrap();
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: tmp.path().to_path_buf(),
global: true,
global_prefix: Some(tmp.path().to_path_buf()),
batch_size: 100,
};
let result = crawler.crawl_all(&opts).await;
let names: Vec<&str> = result.iter().map(|p| p.name.as_str()).collect();
assert!(names.contains(&"requests"));
assert!(names.contains(&"urllib3"));
assert_eq!(result.len(), 2);
}
#[tokio::test]
async fn crawl_all_with_unparseable_dist_info_skips() {
let tmp = tempfile::tempdir().unwrap();
let dist = tmp.path().join("corrupt.dist-info");
tokio::fs::create_dir_all(&dist).await.unwrap();
tokio::fs::write(dist.join("METADATA"), b"").await.unwrap();
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: tmp.path().to_path_buf(),
global: true,
global_prefix: Some(tmp.path().to_path_buf()),
batch_size: 100,
};
let result = crawler.crawl_all(&opts).await;
assert!(
result.is_empty(),
"a dist-info with no usable metadata or version-bearing name must be skipped"
);
}
#[tokio::test]
async fn get_site_packages_paths_with_global_prefix_passthrough() {
let tmp = tempfile::tempdir().unwrap();
let custom = tmp.path().join("custom-sp");
tokio::fs::create_dir_all(&custom).await.unwrap();
let crawler = PythonCrawler;
let opts = CrawlerOptions {
cwd: tmp.path().to_path_buf(),
global: false,
global_prefix: Some(custom.clone()),
batch_size: 100,
};
let paths = crawler.get_site_packages_paths(&opts).await.unwrap();
assert_eq!(paths, vec![custom]);
}
#[tokio::test]
async fn read_python_metadata_stops_at_blank_line_then_falls_back() {
let tmp = tempfile::tempdir().unwrap();
let dist = tmp.path().join("requests-9.9.9.dist-info");
tokio::fs::create_dir(&dist).await.unwrap();
tokio::fs::write(dist.join("METADATA"), "Name: requests\n\nVersion: 2.28.0\n")
.await
.unwrap();
let result = read_python_metadata(&dist).await;
assert_eq!(
result,
Some(("requests".to_string(), "9.9.9".to_string())),
"blank-line break must fire before Version is read, so the version \
comes from the dir name; got {result:?}"
);
}
#[tokio::test]
async fn read_python_metadata_missing_version_falls_back_to_dir_name() {
let tmp = tempfile::tempdir().unwrap();
let dist_info = tmp.path().join("requests-2.28.0.dist-info");
tokio::fs::create_dir(&dist_info).await.unwrap();
tokio::fs::write(
dist_info.join("METADATA"),
"Metadata-Version: 2.1\nName: requests\n",
)
.await
.unwrap();
let result = read_python_metadata(&dist_info).await;
assert_eq!(result, Some(("requests".to_string(), "2.28.0".to_string())));
}