nornir 0.4.12

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
#!/usr/bin/env python3
"""nornir SBOM + vulnerability + license producer.

Pure-stdlib, zero external tools: `cargo metadata` gives the full deep dependency
tree; the OSV.dev batch API gives vulnerabilities; licenses come from the
manifests. Emits a **CycloneDX 1.5** SBOM plus an OSV-style vuln report — the
standards-based feed for the warehouse `sbom_components` / `vuln_findings` /
`license_facts` tables and the viz 🛡 Security tab.

    scripts/sbom-scan.py <repo-dir> [--out sbom.cdx.json]

Exit code is the number of vulnerable crates (0 = clean), for CI gating.
"""
import json
import subprocess
import sys
import urllib.request
from pathlib import Path


def cargo_metadata(repo: str) -> dict:
    out = subprocess.run(
        ["cargo", "metadata", "--format-version", "1"],
        cwd=repo, capture_output=True, text=True, check=True,
    )
    return json.loads(out.stdout)


def osv_vulns(pkgs: list[dict]) -> dict[tuple[str, str], list[dict]]:
    queries = [
        {"package": {"ecosystem": "crates.io", "name": p["name"]}, "version": p["version"]}
        for p in pkgs
    ]
    req = urllib.request.Request(
        "https://api.osv.dev/v1/querybatch",
        data=json.dumps({"queries": queries}).encode(),
        headers={"Content-Type": "application/json"},
    )
    res = json.load(urllib.request.urlopen(req, timeout=60))
    found: dict[tuple[str, str], list[dict]] = {}
    for i, r in enumerate(res.get("results", [])):
        if r.get("vulns"):
            found[(pkgs[i]["name"], pkgs[i]["version"])] = r["vulns"]
    return found


def purl(name: str, version: str) -> str:
    return f"pkg:cargo/{name}@{version}"


def main() -> int:
    if len(sys.argv) < 2:
        print(__doc__)
        return 2
    repo = sys.argv[1]
    out = None
    if "--out" in sys.argv:
        out = sys.argv[sys.argv.index("--out") + 1]

    md = cargo_metadata(repo)
    pkgs = [
        {"name": p["name"], "version": p["version"], "license": p.get("license") or "NOASSERTION"}
        for p in md["packages"]
    ]
    vulns = osv_vulns(pkgs)

    # CycloneDX 1.5 SBOM
    sbom = {
        "bomFormat": "CycloneDX",
        "specVersion": "1.5",
        "version": 1,
        "metadata": {"component": {"type": "application", "name": Path(repo).name}},
        "components": [
            {
                "type": "library",
                "name": p["name"],
                "version": p["version"],
                "purl": purl(p["name"], p["version"]),
                "licenses": [{"license": {"name": p["license"]}}],
            }
            for p in pkgs
        ],
        "vulnerabilities": [
            {
                "id": v.get("id"),
                "source": {"name": "OSV", "url": f"https://osv.dev/vulnerability/{v.get('id')}"},
                "affects": [{"ref": purl(name, ver)}],
                "description": v.get("summary", ""),
            }
            for (name, ver), vs in vulns.items()
            for v in vs
        ],
    }

    # license tally
    lic: dict[str, int] = {}
    for p in pkgs:
        lic[p["license"]] = lic.get(p["license"], 0) + 1

    if out:
        Path(out).write_text(json.dumps(sbom, indent=2))

    print(f"repo            {Path(repo).name}")
    print(f"components      {len(pkgs)} crates (deep, incl. transitive)")
    print(f"vulnerabilities {sum(len(v) for v in vulns.values())} across {len(vulns)} crates")
    for (name, ver), vs in sorted(vulns.items()):
        print(f"{name} {ver}: {', '.join(v.get('id') for v in vs)}")
    print("licenses        " + ", ".join(f"{k}×{n}" for k, n in sorted(lic.items(), key=lambda x: -x[1])[:6]))
    if out:
        print(f"CycloneDX SBOM  {out}")
    return len(vulns)


if __name__ == "__main__":
    sys.exit(main())