use std::path::{Path, PathBuf};
use std::process::{Command, Output};
use sha2::{Digest, Sha256};
const PYPI_UUID: &str = "725a5343-52ec-4290-b7ce-e1cec55878e1";
const PYPI_PURL_PREFIX: &str = "pkg:pypi/pydantic-ai@0.0.36";
fn binary() -> PathBuf {
env!("CARGO_BIN_EXE_socket-patch").into()
}
fn has_python3() -> bool {
Command::new("python3")
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
}
fn git_sha256(content: &[u8]) -> String {
let header = format!("blob {}\0", content.len());
let mut hasher = Sha256::new();
hasher.update(header.as_bytes());
hasher.update(content);
hex::encode(hasher.finalize())
}
fn git_sha256_file(path: &Path) -> String {
let content = std::fs::read(path).unwrap_or_else(|e| panic!("read {}: {e}", path.display()));
git_sha256(&content)
}
fn run(cwd: &Path, args: &[&str]) -> (i32, String, String) {
let out: Output = Command::new(binary())
.args(args)
.current_dir(cwd)
.env_remove("SOCKET_API_TOKEN") .output()
.expect("failed to execute socket-patch binary");
let code = out.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&out.stdout).to_string();
let stderr = String::from_utf8_lossy(&out.stderr).to_string();
(code, stdout, stderr)
}
fn assert_run_ok(cwd: &Path, args: &[&str], context: &str) -> (String, String) {
let (code, stdout, stderr) = run(cwd, args);
assert_eq!(
code, 0,
"{context} failed (exit {code}).\nstdout:\n{stdout}\nstderr:\n{stderr}"
);
(stdout, stderr)
}
fn find_site_packages(cwd: &Path) -> PathBuf {
let venv = cwd.join(".venv");
if cfg!(windows) {
let sp = venv.join("Lib").join("site-packages");
assert!(sp.exists(), "site-packages not found at {}", sp.display());
return sp;
}
let lib = venv.join("lib");
for entry in std::fs::read_dir(&lib).expect("read .venv/lib") {
let entry = entry.unwrap();
let name = entry.file_name();
let name = name.to_string_lossy();
if name.starts_with("python3.") {
let sp = entry.path().join("site-packages");
if sp.exists() {
return sp;
}
}
}
panic!("site-packages not found under {}", lib.display());
}
fn setup_venv(cwd: &Path) {
let status = Command::new("python3")
.args(["-m", "venv", ".venv"])
.current_dir(cwd)
.status()
.expect("failed to create venv");
assert!(status.success(), "python3 -m venv failed");
let pip = if cfg!(windows) {
cwd.join(".venv/Scripts/pip")
} else {
cwd.join(".venv/bin/pip")
};
let out = Command::new(&pip)
.args([
"install",
"--no-deps",
"--disable-pip-version-check",
"pydantic-ai==0.0.36",
"pydantic-ai-slim==0.0.36",
])
.current_dir(cwd)
.output()
.expect("failed to run pip install");
assert!(
out.status.success(),
"pip install failed.\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
}
fn read_patch_files(manifest_path: &Path) -> (String, serde_json::Value) {
let manifest: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(manifest_path).unwrap()).unwrap();
let patches = manifest["patches"].as_object().expect("patches object");
let (purl, patch) = patches
.iter()
.find(|(k, _)| k.starts_with(PYPI_PURL_PREFIX))
.unwrap_or_else(|| panic!("no patch matching {PYPI_PURL_PREFIX} in manifest"));
(purl.clone(), patch["files"].clone())
}
#[test]
#[ignore]
fn test_pypi_full_lifecycle() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let dir = tempfile::tempdir().unwrap();
let cwd = dir.path();
setup_venv(cwd);
let site_packages = find_site_packages(cwd);
assert!(
site_packages.join("pydantic_ai").exists(),
"pydantic_ai package should be installed in site-packages"
);
let files_to_check = [
"pydantic_ai/messages.py",
"pydantic_ai/models/__init__.py",
"pydantic_ai/models/anthropic.py",
"pydantic_ai/models/gemini.py",
"pydantic_ai/models/openai.py",
];
let original_hashes: Vec<(String, String)> = files_to_check
.iter()
.map(|f| {
let path = site_packages.join(f);
let hash = if path.exists() {
git_sha256_file(&path)
} else {
String::new() };
(f.to_string(), hash)
})
.collect();
assert_run_ok(cwd, &["get", PYPI_UUID], "get");
let manifest_path = cwd.join(".socket/manifest.json");
assert!(manifest_path.exists(), ".socket/manifest.json should exist after get");
let (purl, files_value) = read_patch_files(&manifest_path);
assert!(
purl.starts_with(PYPI_PURL_PREFIX),
"purl should start with {PYPI_PURL_PREFIX}, got {purl}"
);
let files = files_value.as_object().expect("files should be an object");
assert!(!files.is_empty(), "patch should modify at least one file");
for (rel_path, info) in files {
let after_hash = info["afterHash"]
.as_str()
.expect("afterHash should be a string");
let full_path = site_packages.join(rel_path);
assert!(
full_path.exists(),
"patched file should exist: {}",
full_path.display()
);
assert_eq!(
git_sha256_file(&full_path),
after_hash,
"hash mismatch for {rel_path} after get"
);
}
let (stdout, _) = assert_run_ok(cwd, &["list", "--json"], "list --json");
let list: serde_json::Value = serde_json::from_str(&stdout).unwrap();
let patches = list["patches"].as_array().expect("patches array");
assert_eq!(patches.len(), 1, "should have exactly one patch");
assert_eq!(patches[0]["uuid"].as_str().unwrap(), PYPI_UUID);
let vulns = patches[0]["vulnerabilities"]
.as_array()
.expect("vulnerabilities array");
assert!(!vulns.is_empty(), "should have vulnerability info");
let has_cve = vulns.iter().any(|v| {
v["cves"]
.as_array()
.map_or(false, |cves| cves.iter().any(|c| c == "CVE-2026-25580"))
});
assert!(has_cve, "vulnerability list should include CVE-2026-25580");
assert_run_ok(cwd, &["rollback"], "rollback");
for (rel_path, info) in files {
let before_hash = info["beforeHash"].as_str().unwrap_or("");
let full_path = site_packages.join(rel_path);
if before_hash.is_empty() {
assert!(
!full_path.exists(),
"new file {rel_path} should be removed after rollback"
);
} else {
assert_eq!(
git_sha256_file(&full_path),
before_hash,
"{rel_path} should match beforeHash after rollback"
);
}
}
for (rel_path, orig_hash) in &original_hashes {
if orig_hash.is_empty() {
continue; }
let full_path = site_packages.join(rel_path);
if full_path.exists() {
assert_eq!(
git_sha256_file(&full_path),
*orig_hash,
"{rel_path} should match original hash after rollback"
);
}
}
assert_run_ok(cwd, &["apply"], "apply");
for (rel_path, info) in files {
let after_hash = info["afterHash"]
.as_str()
.expect("afterHash should be a string");
let full_path = site_packages.join(rel_path);
assert_eq!(
git_sha256_file(&full_path),
after_hash,
"{rel_path} should match afterHash after re-apply"
);
}
assert_run_ok(cwd, &["remove", PYPI_UUID], "remove");
let manifest: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&manifest_path).unwrap()).unwrap();
assert!(
manifest["patches"].as_object().unwrap().is_empty(),
"manifest should be empty after remove"
);
}
#[test]
#[ignore]
fn test_pypi_dry_run() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let dir = tempfile::tempdir().unwrap();
let cwd = dir.path();
setup_venv(cwd);
let site_packages = find_site_packages(cwd);
let messages_py = site_packages.join("pydantic_ai/messages.py");
assert!(messages_py.exists());
let original_hash = git_sha256_file(&messages_py);
assert_run_ok(cwd, &["get", PYPI_UUID, "--no-apply"], "get --no-apply");
assert_eq!(
git_sha256_file(&messages_py),
original_hash,
"file should not change after get --no-apply"
);
assert_run_ok(cwd, &["apply", "--dry-run"], "apply --dry-run");
assert_eq!(
git_sha256_file(&messages_py),
original_hash,
"file should not change after apply --dry-run"
);
assert_run_ok(cwd, &["apply"], "apply");
let manifest_path = cwd.join(".socket/manifest.json");
let (_, files_value) = read_patch_files(&manifest_path);
let files = files_value.as_object().unwrap();
let after_hash = files["pydantic_ai/messages.py"]["afterHash"]
.as_str()
.unwrap();
assert_eq!(
git_sha256_file(&messages_py),
after_hash,
"file should match afterHash after real apply"
);
}
#[test]
#[ignore]
fn test_pypi_global_lifecycle() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let global_dir = tempfile::tempdir().unwrap();
let cwd_dir = tempfile::tempdir().unwrap();
let cwd = cwd_dir.path();
let out = Command::new("python3")
.args([
"-m",
"pip",
"install",
"--target",
global_dir.path().to_str().unwrap(),
"--no-deps",
"--disable-pip-version-check",
"pydantic-ai==0.0.36",
"pydantic-ai-slim==0.0.36",
])
.output()
.expect("failed to run pip install --target");
assert!(
out.status.success(),
"pip install --target failed.\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&out.stdout),
String::from_utf8_lossy(&out.stderr),
);
assert!(
global_dir.path().join("pydantic_ai").exists(),
"pydantic_ai package should be installed in global_dir"
);
let gp_str = global_dir.path().to_str().unwrap();
let (stdout, _) = assert_run_ok(
cwd,
&["scan", "-g", "--global-prefix", gp_str, "--json"],
"scan -g --json",
);
let scan: serde_json::Value = serde_json::from_str(&stdout).unwrap();
let scanned = scan["scannedPackages"]
.as_u64()
.expect("scannedPackages should be a number");
assert!(scanned >= 1, "scan should find at least 1 package, got {scanned}");
assert_run_ok(
cwd,
&["get", PYPI_UUID, "-g", "--global-prefix", gp_str],
"get -g",
);
let manifest_path = cwd.join(".socket/manifest.json");
assert!(manifest_path.exists(), "manifest should exist after get");
let (_, files_value) = read_patch_files(&manifest_path);
let files = files_value.as_object().expect("files object");
for (rel_path, info) in files {
let after_hash = info["afterHash"].as_str().expect("afterHash");
let full_path = global_dir.path().join(rel_path);
assert!(full_path.exists(), "patched file should exist: {}", full_path.display());
assert_eq!(
git_sha256_file(&full_path),
after_hash,
"{rel_path} should match afterHash after global get"
);
}
assert_run_ok(
cwd,
&["rollback", "-g", "--global-prefix", gp_str],
"rollback -g",
);
for (rel_path, info) in files {
let before_hash = info["beforeHash"].as_str().unwrap_or("");
let full_path = global_dir.path().join(rel_path);
if before_hash.is_empty() {
assert!(
!full_path.exists(),
"new file {rel_path} should be removed after global rollback"
);
} else {
assert_eq!(
git_sha256_file(&full_path),
before_hash,
"{rel_path} should match beforeHash after global rollback"
);
}
}
assert_run_ok(
cwd,
&["apply", "-g", "--global-prefix", gp_str],
"apply -g",
);
for (rel_path, info) in files {
let after_hash = info["afterHash"].as_str().expect("afterHash");
let full_path = global_dir.path().join(rel_path);
assert_eq!(
git_sha256_file(&full_path),
after_hash,
"{rel_path} should match afterHash after global apply"
);
}
assert_run_ok(
cwd,
&["remove", PYPI_UUID, "-g", "--global-prefix", gp_str],
"remove -g",
);
let manifest: serde_json::Value =
serde_json::from_str(&std::fs::read_to_string(&manifest_path).unwrap()).unwrap();
assert!(
manifest["patches"].as_object().unwrap().is_empty(),
"manifest should be empty after global remove"
);
}
#[test]
#[ignore]
fn test_pypi_save_only() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let dir = tempfile::tempdir().unwrap();
let cwd = dir.path();
setup_venv(cwd);
let site_packages = find_site_packages(cwd);
let messages_py = site_packages.join("pydantic_ai/messages.py");
assert!(messages_py.exists());
let original_hash = git_sha256_file(&messages_py);
assert_run_ok(cwd, &["get", PYPI_UUID, "--save-only"], "get --save-only");
assert_eq!(
git_sha256_file(&messages_py),
original_hash,
"file should not change after get --save-only"
);
let manifest_path = cwd.join(".socket/manifest.json");
assert!(manifest_path.exists(), "manifest should exist after get --save-only");
let (purl, _) = read_patch_files(&manifest_path);
assert!(
purl.starts_with(PYPI_PURL_PREFIX),
"manifest should contain a pydantic-ai patch"
);
assert_run_ok(cwd, &["apply"], "apply");
let (_, files_value) = read_patch_files(&manifest_path);
let files = files_value.as_object().unwrap();
let after_hash = files["pydantic_ai/messages.py"]["afterHash"]
.as_str()
.unwrap();
assert_eq!(
git_sha256_file(&messages_py),
after_hash,
"file should match afterHash after apply"
);
}
#[cfg(target_os = "macos")]
#[test]
#[ignore]
fn test_pypi_macos_global_auto_discovery() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let cwd_dir = tempfile::tempdir().unwrap();
let cwd = cwd_dir.path();
let (code, stdout, stderr) = run(cwd, &["scan", "-g", "--json"]);
assert_eq!(
code, 0,
"scan -g --json failed (exit {code}).\nstdout:\n{stdout}\nstderr:\n{stderr}"
);
let scan: serde_json::Value = serde_json::from_str(&stdout)
.unwrap_or_else(|e| panic!("invalid JSON from scan -g: {e}\nstdout:\n{stdout}"));
assert!(
scan["scannedPackages"].is_u64(),
"scannedPackages should be a number, got: {}",
scan["scannedPackages"]
);
}
#[test]
#[ignore]
fn test_pypi_uuid_shortcut() {
if !has_python3() {
eprintln!("SKIP: python3 not found on PATH");
return;
}
let dir = tempfile::tempdir().unwrap();
let cwd = dir.path();
setup_venv(cwd);
let site_packages = find_site_packages(cwd);
assert!(site_packages.join("pydantic_ai").exists());
assert_run_ok(cwd, &[PYPI_UUID], "uuid shortcut");
let manifest_path = cwd.join(".socket/manifest.json");
assert!(manifest_path.exists(), "manifest should exist after UUID shortcut");
let (_, files_value) = read_patch_files(&manifest_path);
let files = files_value.as_object().expect("files object");
for (rel_path, info) in files {
let after_hash = info["afterHash"].as_str().expect("afterHash");
let full_path = site_packages.join(rel_path);
assert_eq!(
git_sha256_file(&full_path),
after_hash,
"{rel_path} should match afterHash after UUID shortcut"
);
}
}