use std::path::{Path, PathBuf};
use std::process::Command;
use base64::Engine;
use serial_test::serial;
use sha2::{Digest, Sha256};
use socket_patch_cli::commands::apply::{run as apply_run, ApplyArgs};
use socket_patch_cli::commands::scan::{run as scan_run, ScanArgs};
use wiremock::matchers::{method, path, path_regex};
use wiremock::{Mock, MockServer, ResponseTemplate};
const ORG: &str = "test-org";
const UUID: &str = "12121212-1212-4121-8121-121212121212";
const PYPI_PACKAGE: &str = "six";
const PYPI_VERSION: &str = "1.16.0";
fn git_sha256(content: &[u8]) -> String {
let header = format!("blob {}\0", content.len());
let mut hasher = Sha256::new();
hasher.update(header.as_bytes());
hasher.update(content);
hex::encode(hasher.finalize())
}
fn find_python() -> Option<&'static str> {
for cmd in ["python3", "python", "py"] {
let ok = Command::new(cmd)
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false);
if ok {
return Some(cmd);
}
}
None
}
fn has_python3() -> bool {
find_python().is_some()
}
fn venv_pip(venv: &Path) -> PathBuf {
if cfg!(windows) {
venv.join("Scripts").join("pip.exe")
} else {
venv.join("bin").join("pip")
}
}
fn install_six(tmp: &Path) -> PathBuf {
let venv = tmp.join(".venv");
let python = find_python().expect("python interpreter not on PATH");
let status = Command::new(python)
.args(["-m", "venv", venv.to_str().unwrap()])
.status()
.expect("python venv");
assert!(status.success(), "failed to create venv");
let pip = venv_pip(&venv);
let status = Command::new(&pip)
.args([
"install",
"--disable-pip-version-check",
"--quiet",
"--no-cache-dir",
&format!("{PYPI_PACKAGE}=={PYPI_VERSION}"),
])
.status()
.expect("pip install");
assert!(status.success(), "failed to install {PYPI_PACKAGE}");
let candidate = find_site_packages(&venv).join("six.py");
assert!(
candidate.exists(),
"six.py not found at {} after pip install",
candidate.display()
);
candidate
}
fn find_site_packages(venv: &Path) -> PathBuf {
if cfg!(windows) {
let sp = venv.join("Lib").join("site-packages");
assert!(
sp.exists(),
"Windows venv site-packages not found at {}",
sp.display()
);
sp
} else {
let lib = venv.join("lib");
for entry in std::fs::read_dir(&lib).expect("lib dir").flatten() {
let sp = entry.path().join("site-packages");
if sp.exists() {
return sp;
}
}
panic!("site-packages not found under {}", lib.display());
}
}
async fn setup_pypi_apply_mock(
server: &MockServer,
before_hash: &str,
after_hash: &str,
patched_bytes: &[u8],
) {
let purl = format!("pkg:pypi/{PYPI_PACKAGE}@{PYPI_VERSION}");
let blob_b64 = base64::engine::general_purpose::STANDARD.encode(patched_bytes);
Mock::given(method("POST"))
.and(path(format!("/v0/orgs/{ORG}/patches/batch")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"packages": [{
"purl": purl,
"patches": [{
"uuid": UUID, "purl": purl,
"tier": "free", "cveIds": [], "ghsaIds": [],
"severity": "high", "title": "pypi e2e fixture"
}]
}],
"canAccessPaidPatches": false,
})))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path_regex(format!("^/v0/orgs/{ORG}/patches/by-package/.+$")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"patches": [{
"uuid": UUID, "purl": purl,
"publishedAt": "2024-01-01T00:00:00Z",
"description": "x", "license": "MIT", "tier": "free",
"vulnerabilities": {}
}],
"canAccessPaidPatches": false,
})))
.mount(server)
.await;
Mock::given(method("GET"))
.and(path(format!("/v0/orgs/{ORG}/patches/view/{UUID}")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"uuid": UUID,
"purl": purl,
"publishedAt": "2024-01-01T00:00:00Z",
"files": {
"six.py": {
"beforeHash": before_hash,
"afterHash": after_hash,
"blobContent": blob_b64,
}
},
"vulnerabilities": {},
"description": "pypi e2e fixture",
"license": "MIT",
"tier": "free",
})))
.mount(server)
.await;
}
#[tokio::test]
#[serial]
async fn pypi_install_scan_sync_patches_real_file() {
if !has_python3() {
println!("SKIP: python3 not on PATH");
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let six_path = install_six(tmp.path());
let original = std::fs::read(&six_path).expect("read six.py");
let before_hash = git_sha256(&original);
let mut patched = original.clone();
patched.extend_from_slice(b"\n# SOCKET-PATCH-E2E-MARKER\n");
let after_hash = git_sha256(&patched);
let server = MockServer::start().await;
setup_pypi_apply_mock(&server, &before_hash, &after_hash, &patched).await;
let mut args = ScanArgs {
common: socket_patch_cli::args::GlobalArgs {
cwd: tmp.path().to_path_buf(),
org: Some(ORG.to_string()),
json: true,
yes: true,
global: false,
global_prefix: None,
api_url: server.uri(),
api_token: Some("fake".to_string()),
ecosystems: Some(vec!["pypi".to_string()]),
download_mode: "diff".to_string(),
dry_run: false,
..socket_patch_cli::args::GlobalArgs::default()
},
batch_size: 100,
apply: false,
prune: false,
sync: true,
all_releases: false,
};
let _ = &mut args;
let code = scan_run(args).await;
assert!(code == 0 || code == 1, "scan --sync exit: {code}");
let after = std::fs::read(&six_path).expect("read patched six.py");
assert!(
after.windows(b"SOCKET-PATCH-E2E-MARKER".len())
.any(|w| w == b"SOCKET-PATCH-E2E-MARKER"),
"patched marker not found in {}; file size: {}",
six_path.display(),
after.len()
);
}
#[tokio::test]
#[serial]
async fn pypi_scan_then_apply_force_patches_real_file() {
if !has_python3() {
println!("SKIP: python3 not on PATH");
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let six_path = install_six(tmp.path());
let original = std::fs::read(&six_path).expect("read six.py");
let before_hash = git_sha256(&original);
let mut patched = original.clone();
patched.extend_from_slice(b"\n# SOCKET-PATCH-MARKER-APPLY-FORCE\n");
let after_hash = git_sha256(&patched);
let server = MockServer::start().await;
setup_pypi_apply_mock(&server, &before_hash, &after_hash, &patched).await;
let scan_args = ScanArgs {
common: socket_patch_cli::args::GlobalArgs {
cwd: tmp.path().to_path_buf(),
org: Some(ORG.to_string()),
json: true,
yes: true,
global: false,
global_prefix: None,
api_url: server.uri(),
api_token: Some("fake".to_string()),
ecosystems: Some(vec!["pypi".to_string()]),
download_mode: "diff".to_string(),
dry_run: false,
..socket_patch_cli::args::GlobalArgs::default()
},
batch_size: 100,
apply: false,
prune: false,
sync: true,
all_releases: false,
};
let _ = scan_run(scan_args).await;
let apply_args = ApplyArgs {
common: socket_patch_cli::args::GlobalArgs {
cwd: tmp.path().to_path_buf(),
dry_run: false,
silent: true,
manifest_path: ".socket/manifest.json".to_string(),
offline: true,
global: false,
global_prefix: None,
ecosystems: Some(vec!["pypi".to_string()]),
json: true,
verbose: false,
download_mode: "diff".to_string(),
..socket_patch_cli::args::GlobalArgs::default()
},
force: true,
};
let _ = apply_run(apply_args).await;
let after = std::fs::read(&six_path).expect("read after apply");
assert!(
after.windows(b"SOCKET-PATCH-MARKER-APPLY-FORCE".len())
.any(|w| w == b"SOCKET-PATCH-MARKER-APPLY-FORCE"),
"marker not found post-apply"
);
}
#[tokio::test]
#[serial]
async fn pypi_apply_dry_run_does_not_modify_file() {
if !has_python3() {
println!("SKIP: python3 not on PATH");
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let six_path = install_six(tmp.path());
let original = std::fs::read(&six_path).expect("read six.py");
let before_hash = git_sha256(&original);
let mut patched = original.clone();
patched.extend_from_slice(b"\n# DRY-RUN-MARKER\n");
let after_hash = git_sha256(&patched);
let server = MockServer::start().await;
setup_pypi_apply_mock(&server, &before_hash, &after_hash, &patched).await;
let scan_args = ScanArgs {
common: socket_patch_cli::args::GlobalArgs {
cwd: tmp.path().to_path_buf(),
org: Some(ORG.to_string()),
json: true,
yes: true,
global: false,
global_prefix: None,
api_url: server.uri(),
api_token: Some("fake".to_string()),
ecosystems: Some(vec!["pypi".to_string()]),
download_mode: "diff".to_string(),
dry_run: true,
..socket_patch_cli::args::GlobalArgs::default()
},
batch_size: 100,
apply: true,
prune: false,
sync: false,
all_releases: false,
};
let _ = scan_run(scan_args).await;
let after = std::fs::read(&six_path).expect("read after dry-run");
assert_eq!(
after, original,
"dry-run must not modify the installed file"
);
}
#[tokio::test]
#[serial]
async fn pypi_crawler_finds_real_installed_six() {
if !has_python3() {
println!("SKIP: python3 not on PATH");
return;
}
let tmp = tempfile::tempdir().expect("tempdir");
let _ = install_six(tmp.path());
let site_packages = find_site_packages(&tmp.path().join(".venv"));
let has_dist_info = std::fs::read_dir(&site_packages)
.expect("site-packages")
.flatten()
.any(|e| {
e.file_name()
.to_string_lossy()
.starts_with("six-1.16.0")
});
assert!(has_dist_info, "six-1.16.0.dist-info should be present");
let server = MockServer::start().await;
let purl = format!("pkg:pypi/{PYPI_PACKAGE}@{PYPI_VERSION}");
Mock::given(method("POST"))
.and(path(format!("/v0/orgs/{ORG}/patches/batch")))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"packages": [{
"purl": purl,
"patches": [{
"uuid": UUID, "purl": purl, "tier": "free",
"cveIds": [], "ghsaIds": [], "severity": "low",
"title": "discovery sanity"
}]
}],
"canAccessPaidPatches": false,
})))
.mount(&server)
.await;
let args = ScanArgs {
common: socket_patch_cli::args::GlobalArgs {
cwd: tmp.path().to_path_buf(),
org: Some(ORG.to_string()),
json: true,
yes: true,
global: false,
global_prefix: None,
api_url: server.uri(),
api_token: Some("fake".to_string()),
ecosystems: Some(vec!["pypi".to_string()]),
download_mode: "diff".to_string(),
dry_run: false,
..socket_patch_cli::args::GlobalArgs::default()
},
batch_size: 100,
apply: false,
prune: false,
sync: false,
all_releases: false,
};
assert_eq!(scan_run(args).await, 0);
}