import json
import subprocess
import sys
import urllib.request
from pathlib import Path
def cargo_metadata(repo: str) -> dict:
out = subprocess.run(
["cargo", "metadata", "--format-version", "1"],
cwd=repo, capture_output=True, text=True, check=True,
)
return json.loads(out.stdout)
def osv_vulns(pkgs: list[dict]) -> dict[tuple[str, str], list[dict]]:
queries = [
{"package": {"ecosystem": "crates.io", "name": p["name"]}, "version": p["version"]}
for p in pkgs
]
req = urllib.request.Request(
"https://api.osv.dev/v1/querybatch",
data=json.dumps({"queries": queries}).encode(),
headers={"Content-Type": "application/json"},
)
res = json.load(urllib.request.urlopen(req, timeout=60))
found: dict[tuple[str, str], list[dict]] = {}
for i, r in enumerate(res.get("results", [])):
if r.get("vulns"):
found[(pkgs[i]["name"], pkgs[i]["version"])] = r["vulns"]
return found
def purl(name: str, version: str) -> str:
return f"pkg:cargo/{name}@{version}"
def main() -> int:
if len(sys.argv) < 2:
print(__doc__)
return 2
repo = sys.argv[1]
out = None
if "--out" in sys.argv:
out = sys.argv[sys.argv.index("--out") + 1]
md = cargo_metadata(repo)
pkgs = [
{"name": p["name"], "version": p["version"], "license": p.get("license") or "NOASSERTION"}
for p in md["packages"]
]
vulns = osv_vulns(pkgs)
sbom = {
"bomFormat": "CycloneDX",
"specVersion": "1.5",
"version": 1,
"metadata": {"component": {"type": "application", "name": Path(repo).name}},
"components": [
{
"type": "library",
"name": p["name"],
"version": p["version"],
"purl": purl(p["name"], p["version"]),
"licenses": [{"license": {"name": p["license"]}}],
}
for p in pkgs
],
"vulnerabilities": [
{
"id": v.get("id"),
"source": {"name": "OSV", "url": f"https://osv.dev/vulnerability/{v.get('id')}"},
"affects": [{"ref": purl(name, ver)}],
"description": v.get("summary", ""),
}
for (name, ver), vs in vulns.items()
for v in vs
],
}
lic: dict[str, int] = {}
for p in pkgs:
lic[p["license"]] = lic.get(p["license"], 0) + 1
if out:
Path(out).write_text(json.dumps(sbom, indent=2))
print(f"repo {Path(repo).name}")
print(f"components {len(pkgs)} crates (deep, incl. transitive)")
print(f"vulnerabilities {sum(len(v) for v in vulns.values())} across {len(vulns)} crates")
for (name, ver), vs in sorted(vulns.items()):
print(f" ⚠ {name} {ver}: {', '.join(v.get('id') for v in vs)}")
print("licenses " + ", ".join(f"{k}×{n}" for k, n in sorted(lic.items(), key=lambda x: -x[1])[:6]))
if out:
print(f"CycloneDX SBOM {out}")
return len(vulns)
if __name__ == "__main__":
sys.exit(main())