use crate::ports::outbound::LockfileParseResult;
use crate::sbom_generation::domain::Package;
use crate::shared::error::SbomError;
use crate::shared::Result;
use serde::Deserialize;
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::Path;
#[derive(Debug, Deserialize)]
struct UvDependency {
name: String,
}
#[derive(Debug, Deserialize)]
struct DevDependencies {
#[serde(default)]
dev: Vec<UvDependency>,
}
#[derive(Debug, Deserialize)]
struct PackageSource {
editable: Option<String>,
#[serde(rename = "virtual")]
virtual_path: Option<String>,
}
impl PackageSource {
fn is_local(&self) -> bool {
self.editable.is_some() || self.virtual_path.is_some()
}
}
pub fn parse_lockfile_content(content: &str, project_path: &Path) -> Result<LockfileParseResult> {
#[derive(Debug, Deserialize)]
struct UvPackage {
name: String,
version: String,
#[serde(default)]
dependencies: Vec<UvDependency>,
#[serde(default, rename = "dev-dependencies")]
dev_dependencies: Option<DevDependencies>,
}
#[derive(Debug, Deserialize)]
struct UvLock {
package: Vec<UvPackage>,
}
let lockfile: UvLock = toml::from_str(content).map_err(|e| SbomError::LockfileParseError {
path: project_path.join("uv.lock"),
details: e.to_string(),
})?;
let mut packages = Vec::new();
let mut dependency_map = HashMap::new();
for pkg in lockfile.package {
packages.push(Package::new(pkg.name.clone(), pkg.version.clone())?);
let mut deps = Vec::new();
for dep in &pkg.dependencies {
deps.push(dep.name.clone());
}
if let Some(dev_deps) = &pkg.dev_dependencies {
for dep in &dev_deps.dev {
deps.push(dep.name.clone());
}
}
dependency_map.insert(pkg.name, deps);
}
Ok((packages, dependency_map))
}
pub fn parse_lockfile_content_for_member(
content: &str,
project_path: &Path,
member_name: &str,
) -> Result<LockfileParseResult> {
#[derive(Debug, Deserialize)]
struct UvPackage {
name: String,
version: String,
#[serde(default)]
dependencies: Vec<UvDependency>,
#[serde(default, rename = "dev-dependencies")]
dev_dependencies: Option<DevDependencies>,
source: Option<PackageSource>,
}
#[derive(Debug, Deserialize)]
struct UvLock {
package: Vec<UvPackage>,
}
let lockfile: UvLock = toml::from_str(content).map_err(|e| SbomError::LockfileParseError {
path: project_path.join("uv.lock"),
details: e.to_string(),
})?;
let mut full_dep_map: HashMap<String, Vec<String>> = HashMap::new();
let mut pkg_lookup: HashMap<String, (String, String)> = HashMap::new();
let mut member_direct_deps: Option<Vec<String>> = None;
for pkg in &lockfile.package {
let deps = collect_all_deps(&pkg.dependencies, pkg.dev_dependencies.as_ref());
let is_member_root =
pkg.name == member_name && pkg.source.as_ref().map(|s| s.is_local()).unwrap_or(false);
if is_member_root {
member_direct_deps = Some(deps.clone());
}
full_dep_map.insert(pkg.name.clone(), deps);
pkg_lookup.insert(pkg.name.clone(), (pkg.name.clone(), pkg.version.clone()));
}
let direct_deps = member_direct_deps.ok_or_else(|| {
anyhow::anyhow!(
"Workspace member '{}' not found in uv.lock (no package with source.editable or source.virtual set)",
member_name
)
})?;
let visited = bfs_reachable(&full_dep_map, direct_deps);
let mut packages = Vec::new();
let mut dependency_map = HashMap::new();
for name in &visited {
if let Some((pkg_name, pkg_version)) = pkg_lookup.get(name) {
packages.push(Package::new(pkg_name.clone(), pkg_version.clone())?);
if let Some(deps) = full_dep_map.get(name) {
dependency_map.insert(pkg_name.clone(), deps.clone());
}
}
}
Ok((packages, dependency_map))
}
fn collect_all_deps(
dependencies: &[UvDependency],
dev_dependencies: Option<&DevDependencies>,
) -> Vec<String> {
let mut deps: Vec<String> = dependencies.iter().map(|d| d.name.clone()).collect();
if let Some(dev_deps) = dev_dependencies {
for dep in &dev_deps.dev {
deps.push(dep.name.clone());
}
}
deps
}
fn bfs_reachable(dep_map: &HashMap<String, Vec<String>>, seeds: Vec<String>) -> HashSet<String> {
let mut visited: HashSet<String> = HashSet::new();
let mut queue: VecDeque<String> = VecDeque::new();
for dep in seeds {
if !visited.contains(&dep) {
visited.insert(dep.clone());
queue.push_back(dep);
}
}
while let Some(current) = queue.pop_front() {
if let Some(deps) = dep_map.get(¤t) {
for dep in deps {
if !visited.contains(dep) {
visited.insert(dep.clone());
queue.push_back(dep.clone());
}
}
}
}
visited
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::path::Path;
const SIMPLE_LOCK: &str = r#"
version = 1
requires-python = ">=3.11"
[[package]]
name = "requests"
version = "2.31.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "urllib3" },
]
[[package]]
name = "urllib3"
version = "2.0.7"
source = { registry = "https://pypi.org/simple" }
"#;
#[test]
fn test_parse_lockfile_content_basic_returns_packages_and_deps() {
let (packages, dep_map) =
parse_lockfile_content(SIMPLE_LOCK, Path::new("/project")).unwrap();
let names: HashSet<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(names.contains("requests"));
assert!(names.contains("urllib3"));
assert_eq!(dep_map["requests"], vec!["urllib3"]);
assert!(dep_map["urllib3"].is_empty());
}
#[test]
fn test_parse_lockfile_content_invalid_toml_returns_error() {
let result = parse_lockfile_content("not valid toml [[[", Path::new("/project"));
assert!(result.is_err());
}
#[test]
fn test_parse_lockfile_content_empty_lockfile_returns_no_packages() {
let content = "version = 1\n";
let result = parse_lockfile_content(content, Path::new("/project"));
assert!(result.is_err());
}
const WORKSPACE_LOCK_FOR_MEMBER: &str = r#"
version = 1
requires-python = ">=3.11"
[manifest]
members = [
"packages/alpha",
"packages/beta",
]
[[package]]
name = "alpha"
version = "0.1.0"
source = { editable = "packages/alpha" }
dependencies = [
{ name = "certifi" },
{ name = "requests" },
]
[[package]]
name = "beta"
version = "0.2.0"
source = { editable = "packages/beta" }
dependencies = [
{ name = "urllib3" },
]
[[package]]
name = "certifi"
version = "2024.1.1"
source = { registry = "https://pypi.org/simple" }
[[package]]
name = "requests"
version = "2.31.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "urllib3" },
]
[[package]]
name = "shared-lib"
version = "1.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
]
[[package]]
name = "urllib3"
version = "2.0.7"
source = { registry = "https://pypi.org/simple" }
"#;
#[test]
fn test_parse_lockfile_for_member_returns_correct_subtree_for_alpha() {
let (packages, dep_map) = parse_lockfile_content_for_member(
WORKSPACE_LOCK_FOR_MEMBER,
Path::new("/workspace"),
"alpha",
)
.unwrap();
let names: HashSet<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(!names.contains("alpha"), "member root must be excluded");
assert!(!names.contains("beta"), "sibling member must be excluded");
assert!(
!names.contains("shared-lib"),
"unreachable package must be excluded"
);
assert!(names.contains("requests"));
assert!(names.contains("urllib3"));
assert!(names.contains("certifi"));
assert!(dep_map.contains_key("requests"));
assert!(dep_map.contains_key("urllib3"));
assert!(dep_map.contains_key("certifi"));
}
#[test]
fn test_parse_lockfile_for_member_returns_correct_subtree_for_beta() {
let (packages, _dep_map) = parse_lockfile_content_for_member(
WORKSPACE_LOCK_FOR_MEMBER,
Path::new("/workspace"),
"beta",
)
.unwrap();
let names: HashSet<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(!names.contains("beta"), "member root must be excluded");
assert!(!names.contains("alpha"), "sibling member must be excluded");
assert!(!names.contains("requests"), "unreachable from beta");
assert!(!names.contains("certifi"), "unreachable from beta");
assert!(!names.contains("shared-lib"), "unreachable from beta");
assert!(names.contains("urllib3"));
}
#[test]
fn test_parse_lockfile_for_member_member_root_excluded() {
let (packages, _) = parse_lockfile_content_for_member(
WORKSPACE_LOCK_FOR_MEMBER,
Path::new("/workspace"),
"alpha",
)
.unwrap();
let names: Vec<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(
!names.contains(&"alpha".to_string()),
"member root must not appear in result"
);
}
#[test]
fn test_parse_lockfile_for_member_nonexistent_member_returns_error() {
let result = parse_lockfile_content_for_member(
WORKSPACE_LOCK_FOR_MEMBER,
Path::new("/workspace"),
"nonexistent-member",
);
assert!(result.is_err());
let err_string = result.unwrap_err().to_string();
assert!(
err_string.contains("nonexistent-member"),
"error must mention the missing member name"
);
}
const WORKSPACE_LOCK_VIRTUAL_FORMAT: &str = r#"
version = 1
revision = 3
requires-python = ">=3.11"
[manifest]
members = [
"api",
"worker",
]
[[package]]
name = "api"
version = "0.1.0"
source = { virtual = "packages/api" }
dependencies = [
{ name = "fastapi" },
{ name = "requests" },
]
[[package]]
name = "celery"
version = "5.4.0"
source = { registry = "https://pypi.org/simple" }
[[package]]
name = "fastapi"
version = "0.115.0"
source = { registry = "https://pypi.org/simple" }
[[package]]
name = "requests"
version = "2.32.3"
source = { registry = "https://pypi.org/simple" }
[[package]]
name = "worker"
version = "0.1.0"
source = { virtual = "packages/worker" }
dependencies = [
{ name = "celery" },
]
"#;
#[test]
fn test_parse_lockfile_for_member_handles_virtual_source_for_api() {
let (packages, _) = parse_lockfile_content_for_member(
WORKSPACE_LOCK_VIRTUAL_FORMAT,
Path::new("/workspace"),
"api",
)
.unwrap();
let names: HashSet<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(!names.contains("api"), "member root must be excluded");
assert!(!names.contains("worker"), "sibling member must be excluded");
assert!(names.contains("requests"));
assert!(names.contains("fastapi"));
assert!(!names.contains("celery"), "unreachable from api");
}
#[test]
fn test_parse_lockfile_for_member_handles_virtual_source_for_worker() {
let (packages, _) = parse_lockfile_content_for_member(
WORKSPACE_LOCK_VIRTUAL_FORMAT,
Path::new("/workspace"),
"worker",
)
.unwrap();
let names: HashSet<String> = packages.iter().map(|p| p.name().to_string()).collect();
assert!(!names.contains("worker"), "member root must be excluded");
assert!(!names.contains("api"), "sibling member must be excluded");
assert!(names.contains("celery"));
assert!(!names.contains("requests"), "unreachable from worker");
assert!(!names.contains("fastapi"), "unreachable from worker");
}
}