use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use serde_json::json;
use wiremock::{Request, ResponseTemplate};
use uv_distribution_filename::WheelFilename;
use uv_normalize::PackageName;
use uv_pep440::VersionSpecifiers;
use crate::http_server::{HttpServer, content_type_for_filename};
use crate::vendor::{VendorArtifact, vendor_artifacts};
use super::scenario::{Scenario, WheelTag};
use super::scenarios_dir;
use super::wheel::{generate_sdist, generate_wheel, sha256_hex};
const PACKSE_UPLOAD_TIME: &str = "2024-03-24T00:00:00Z";
struct DistInfo {
filename: String,
sha256: String,
requires_python: Option<VersionSpecifiers>,
upload_time: &'static str,
yanked: bool,
}
struct PackageEntry {
dists: Vec<DistInfo>,
}
enum FileData {
Bytes(Arc<[u8]>),
Vendor(&'static VendorArtifact),
}
impl FileData {
fn bytes(&self) -> anyhow::Result<Arc<[u8]>> {
match self {
Self::Bytes(bytes) => Ok(Arc::clone(bytes)),
Self::Vendor(artifact) => artifact.bytes(),
}
}
}
struct ServerIndex {
packages: HashMap<PackageName, PackageEntry>,
files: HashMap<String, FileData>,
}
pub struct PackseServer {
server: HttpServer,
}
impl PackseServer {
pub fn new(scenario_path: &str) -> Self {
let full_path = scenarios_dir().join(scenario_path);
let scenario =
Scenario::from_path(&full_path).expect("vendored Packse scenario should parse");
Self::from_scenario(&scenario)
}
pub fn empty() -> Self {
Self::from_scenario(&Scenario::empty())
}
pub fn from_scenario(scenario: &Scenario) -> Self {
let index = Arc::new(build_server_index(scenario));
let server = HttpServer::start(move |request, server_uri| {
handle_request(request, server_uri, &index)
});
Self { server }
}
pub fn index_url(&self) -> String {
format!("{}/simple/", self.server.url())
}
}
fn build_server_index(scenario: &Scenario) -> ServerIndex {
let mut packages = HashMap::new();
let mut files: HashMap<String, FileData> = HashMap::new();
for (package_name, package) in &scenario.packages {
let mut dists = Vec::new();
for (version, meta) in &package.versions {
if meta.wheel {
let tags = if meta.wheel_tags.is_empty() {
vec!["py3-none-any"]
} else {
meta.wheel_tags.iter().map(WheelTag::as_str).collect()
};
for tag in tags {
let (filename, bytes) = generate_wheel(
package_name,
version,
&meta.requires,
&meta.extras,
meta.requires_python.as_ref(),
tag,
);
let sha256 = sha256_hex(&bytes);
files.insert(filename.clone(), FileData::Bytes(bytes.into()));
dists.push(DistInfo {
filename,
sha256,
requires_python: meta.requires_python.clone(),
upload_time: PACKSE_UPLOAD_TIME,
yanked: meta.yanked,
});
}
}
if meta.sdist {
let (filename, bytes) = generate_sdist(
package_name,
version,
&meta.requires,
&meta.extras,
meta.requires_python.as_ref(),
);
let sha256 = sha256_hex(&bytes);
files.insert(filename.clone(), FileData::Bytes(bytes.into()));
dists.push(DistInfo {
filename,
sha256,
requires_python: meta.requires_python.clone(),
upload_time: PACKSE_UPLOAD_TIME,
yanked: meta.yanked,
});
}
}
packages.insert(package_name.clone(), PackageEntry { dists });
}
for artifact in vendor_artifacts() {
if !Path::new(artifact.filename)
.extension()
.is_some_and(|extension| extension.eq_ignore_ascii_case("whl"))
{
continue;
}
let wheel_filename =
WheelFilename::from_str(artifact.filename).expect("invalid vendor wheel filename");
files.insert(artifact.filename.to_string(), FileData::Vendor(artifact));
packages
.entry(wheel_filename.name)
.or_insert_with(|| PackageEntry { dists: Vec::new() })
.dists
.push(DistInfo {
filename: artifact.filename.to_string(),
sha256: artifact.sha256.to_string(),
requires_python: None,
upload_time: PACKSE_UPLOAD_TIME,
yanked: false,
});
}
ServerIndex { packages, files }
}
fn handle_request(req: &Request, server_uri: &str, index: &ServerIndex) -> ResponseTemplate {
let path = req.url.path();
if let Some(pkg) = extract_package_name(path) {
let Ok(package_name) = PackageName::from_str(pkg) else {
return ResponseTemplate::new(404);
};
if let Some(entry) = index.packages.get(&package_name) {
return build_simple_api_response(pkg, entry, server_uri);
}
return ResponseTemplate::new(404);
}
if let Some(filename) = path.strip_prefix("/files/") {
if let Some(file) = index.files.get(filename) {
return match file.bytes() {
Ok(bytes) => ResponseTemplate::new(200)
.set_body_raw(bytes.to_vec(), content_type_for_filename(filename)),
Err(error) => ResponseTemplate::new(500).set_body_string(format!("{error:#}")),
};
}
return ResponseTemplate::new(404);
}
ResponseTemplate::new(404)
}
fn build_simple_api_response(
package_name: &str,
entry: &PackageEntry,
server_uri: &str,
) -> ResponseTemplate {
let files: Vec<serde_json::Value> = entry
.dists
.iter()
.map(|dist| {
let url = format!("{server_uri}/files/{}", dist.filename);
let mut file_obj = json!({
"filename": dist.filename,
"url": url,
"hashes": {
"sha256": dist.sha256,
},
"upload-time": dist.upload_time,
});
if let Some(rp) = &dist.requires_python {
file_obj["requires-python"] = json!(rp);
}
if dist.yanked {
file_obj["yanked"] = json!(true);
}
file_obj
})
.collect();
let body = json!({
"meta": { "api-version": "1.1" },
"name": package_name,
"files": files,
});
let body_str = body.to_string();
ResponseTemplate::new(200)
.insert_header("Content-Type", "application/vnd.pypi.simple.v1+json")
.set_body_raw(body_str, "application/vnd.pypi.simple.v1+json")
}
fn extract_package_name(path: &str) -> Option<&str> {
let rest = path.strip_prefix("/simple/")?;
let pkg = rest.strip_suffix('/').unwrap_or(rest);
if pkg.is_empty() || pkg.contains('/') {
return None;
}
Some(pkg)
}
#[cfg(test)]
mod tests {
use crate::vendor::vendor_artifacts;
use super::{Scenario, build_server_index, extract_package_name};
#[test]
fn extract_package_name_accepts_with_or_without_trailing_slash() {
assert_eq!(extract_package_name("/simple/foo/"), Some("foo"));
assert_eq!(extract_package_name("/simple/foo"), Some("foo"));
}
#[test]
fn extract_package_name_rejects_invalid_paths() {
assert_eq!(extract_package_name("/simple/"), None);
assert_eq!(extract_package_name("/simple"), None);
assert_eq!(extract_package_name("/simple/foo/bar"), None);
}
#[test]
fn server_index_construction_does_not_load_vendor_artifacts() {
let _index = build_server_index(&Scenario::empty());
assert!(
vendor_artifacts()
.iter()
.all(|artifact| !artifact.is_loaded())
);
}
}