cc-audit 3.2.14

Security auditor for Claude Code skills, hooks, and MCP servers
Documentation
#!/usr/bin/env python3
"""
CVE Database Updater for cc-audit

Fetches CVE data from NVD API 2.0 for AI coding tools and MCP-related products,
then updates the local CVE database.
"""

import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlencode
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError

# NVD API configuration
NVD_API_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
NVD_API_KEY = os.environ.get("NVD_API_KEY")

# Keywords to search for AI coding tools CVEs
SEARCH_KEYWORDS = [
    "mcp model context protocol",
    "claude code",
    "cursor ide",
    "github copilot",
    "codeium",
    "tabnine",
    "amazon q developer",
    "windsurf",
    "continue dev",
]

# Known vendor/product mappings for AI coding tools
KNOWN_PRODUCTS = {
    "anthropic": ["claude-code", "claude-code-vscode", "claude-code-jetbrains"],
    "modelcontextprotocol": ["mcp", "mcp-inspector", "mcp-remote", "mcp-server"],
    "cursor": ["cursor"],
    "github": ["copilot", "copilot-chat"],
    "microsoft": ["copilot"],
    "codeium": ["codeium"],
    "tabnine": ["tabnine"],
    "amazon": ["q-developer", "codewhisperer"],
    "continue": ["continue"],
}

# Rate limiting
REQUEST_DELAY = 6.0  # seconds between requests (NVD allows 5 requests per 30 seconds without API key)
REQUEST_DELAY_WITH_KEY = 0.6  # with API key: 50 requests per 30 seconds


def get_request_delay() -> float:
    """Get appropriate delay based on API key availability."""
    return REQUEST_DELAY_WITH_KEY if NVD_API_KEY else REQUEST_DELAY


def fetch_nvd_cves(keyword: str, days_back: int = 30) -> list[dict[str, Any]]:
    """Fetch CVEs from NVD API for a given keyword."""
    # Calculate date range
    end_date = datetime.now(timezone.utc)
    start_date = end_date - timedelta(days=days_back)

    params = {
        "keywordSearch": keyword,
        "pubStartDate": start_date.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "pubEndDate": end_date.strftime("%Y-%m-%dT%H:%M:%S.000"),
    }

    query_string = urlencode(params)
    url = f"{NVD_API_BASE}?{query_string}"

    headers = {"Accept": "application/json"}
    if NVD_API_KEY:
        headers["apiKey"] = NVD_API_KEY

    try:
        req = Request(url, headers=headers)
        with urlopen(req, timeout=30) as response:
            data = json.loads(response.read().decode())
            return data.get("vulnerabilities", [])
    except HTTPError as e:
        print(f"HTTP Error fetching CVEs for '{keyword}': {e.code} {e.reason}", file=sys.stderr)
        return []
    except URLError as e:
        print(f"URL Error fetching CVEs for '{keyword}': {e.reason}", file=sys.stderr)
        return []
    except TimeoutError as e:
        print(f"Timeout fetching CVEs for '{keyword}': {e}", file=sys.stderr)
        return []
    except json.JSONDecodeError as e:
        print(f"JSON decode error for '{keyword}': {e}", file=sys.stderr)
        return []


def parse_cvss_severity(cve_data: dict[str, Any]) -> tuple[str, float | None]:
    """Extract severity and CVSS score from CVE data."""
    metrics = cve_data.get("metrics", {})

    # Try CVSS 3.1 first, then 3.0, then 2.0
    for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
        if version in metrics and metrics[version]:
            metric = metrics[version][0]
            cvss_data = metric.get("cvssData", {})
            score = cvss_data.get("baseScore")
            severity = cvss_data.get("baseSeverity", "").lower()

            if severity in ["critical", "high", "medium", "low"]:
                return severity, score

    return "medium", None


def extract_affected_products(cve_data: dict[str, Any]) -> list[dict[str, Any]]:
    """Extract affected products from CVE configurations."""
    products = []
    configurations = cve_data.get("configurations", [])

    for config in configurations:
        for node in config.get("nodes", []):
            for cpe_match in node.get("cpeMatch", []):
                cpe = cpe_match.get("criteria", "")
                # Parse CPE: cpe:2.3:a:vendor:product:version:...
                parts = cpe.split(":")
                if len(parts) >= 5:
                    vendor = parts[3]
                    product = parts[4]

                    # Check if this is a relevant product
                    is_relevant = False
                    for known_vendor, known_products in KNOWN_PRODUCTS.items():
                        if vendor.lower() == known_vendor or any(
                            p.lower() in product.lower() for p in known_products
                        ):
                            is_relevant = True
                            break

                    if is_relevant:
                        version_end = cpe_match.get("versionEndExcluding")
                        version_end_incl = cpe_match.get("versionEndIncluding")

                        if version_end:
                            version_affected = f"< {version_end}"
                            version_fixed = version_end
                        elif version_end_incl:
                            version_affected = f"<= {version_end_incl}"
                            version_fixed = None
                        else:
                            version_affected = "*"
                            version_fixed = None

                        products.append({
                            "vendor": vendor,
                            "product": product,
                            "version_affected": version_affected,
                            "version_fixed": version_fixed,
                        })

    return products


def extract_cwe_ids(cve_data: dict[str, Any]) -> list[str]:
    """Extract CWE IDs from CVE data."""
    cwe_ids = []
    weaknesses = cve_data.get("weaknesses", [])

    for weakness in weaknesses:
        for desc in weakness.get("description", []):
            value = desc.get("value", "")
            if value.startswith("CWE-"):
                cwe_ids.append(value)

    return list(set(cwe_ids))


def extract_references(cve_data: dict[str, Any]) -> list[str]:
    """Extract reference URLs from CVE data."""
    refs = []
    references = cve_data.get("references", [])

    for ref in references[:5]:  # Limit to 5 references
        url = ref.get("url", "")
        if url:
            refs.append(url)

    # Always add NVD link
    cve_id = cve_data.get("id", "")
    if cve_id:
        nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve_id}"
        if nvd_url not in refs:
            refs.insert(0, nvd_url)

    return refs


def convert_to_entry(vuln: dict[str, Any]) -> dict[str, Any] | None:
    """Convert NVD vulnerability to our database format."""
    cve = vuln.get("cve", {})
    cve_id = cve.get("id", "")

    if not cve_id:
        return None

    # Get descriptions (prefer English)
    descriptions = cve.get("descriptions", [])
    description = ""
    for desc in descriptions:
        if desc.get("lang") == "en":
            description = desc.get("value", "")
            break

    if not description:
        return None

    severity, cvss_score = parse_cvss_severity(cve)
    affected_products = extract_affected_products(cve)

    # Skip if no relevant products found
    if not affected_products:
        return None

    cwe_ids = extract_cwe_ids(cve)
    references = extract_references(cve)

    # Generate title from description (first sentence or first 100 chars)
    title = description.split(".")[0]
    if len(title) > 100:
        title = title[:97] + "..."

    published = cve.get("published", "")

    entry = {
        "id": cve_id,
        "title": title,
        "description": description,
        "severity": severity,
        "affected_products": affected_products,
        "cwe_ids": cwe_ids,
        "references": references,
        "published_at": published,
    }

    if cvss_score is not None:
        entry["cvss_score"] = cvss_score

    return entry


def load_existing_database(path: Path) -> dict[str, Any]:
    """Load existing CVE database."""
    if path.exists():
        with open(path) as f:
            return json.load(f)
    return {
        "version": "1.0.0",
        "updated_at": "",
        "entries": [],
    }


def merge_entries(
    existing: list[dict[str, Any]], new_entries: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """Merge new entries with existing, avoiding duplicates."""
    existing_ids = {e["id"] for e in existing}
    merged = list(existing)

    for entry in new_entries:
        if entry["id"] not in existing_ids:
            merged.append(entry)
            print(f"  Added new CVE: {entry['id']}")

    # Sort by published date (newest first)
    merged.sort(key=lambda e: e.get("published_at", ""), reverse=True)

    return merged


def main() -> int:
    """Main entry point."""
    # Determine database path
    script_dir = Path(__file__).parent
    repo_root = script_dir.parent
    db_path = repo_root / "data" / "cve-database.json"

    print(f"CVE Database Updater")
    print(f"Database path: {db_path}")
    print(f"NVD API Key: {'configured' if NVD_API_KEY else 'not configured (rate limited)'}")
    print()

    # Load existing database
    database = load_existing_database(db_path)
    existing_entries = database.get("entries", [])
    print(f"Existing entries: {len(existing_entries)}")

    # Fetch new CVEs
    all_new_entries = []
    delay = get_request_delay()

    for keyword in SEARCH_KEYWORDS:
        print(f"Searching for: {keyword}")
        vulns = fetch_nvd_cves(keyword, days_back=90)
        print(f"  Found {len(vulns)} results")

        for vuln in vulns:
            entry = convert_to_entry(vuln)
            if entry:
                all_new_entries.append(entry)

        time.sleep(delay)  # Rate limiting

    # Deduplicate new entries
    seen_ids = set()
    unique_new = []
    for entry in all_new_entries:
        if entry["id"] not in seen_ids:
            seen_ids.add(entry["id"])
            unique_new.append(entry)

    print(f"\nNew unique entries found: {len(unique_new)}")

    # Merge with existing
    merged = merge_entries(existing_entries, unique_new)

    if len(merged) == len(existing_entries):
        print("\nNo new CVEs to add.")
        return 0

    # Update database
    database["entries"] = merged
    database["updated_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")

    # Increment patch version for database
    version_parts = database.get("version", "1.0.0").split(".")
    version_parts[-1] = str(int(version_parts[-1]) + 1)
    database["version"] = ".".join(version_parts)

    # Write updated database
    with open(db_path, "w") as f:
        json.dump(database, f, indent=2)
        f.write("\n")

    print(f"\nDatabase updated!")
    print(f"  Version: {database['version']}")
    print(f"  Total entries: {len(merged)}")
    print(f"  New entries: {len(merged) - len(existing_entries)}")

    return 0


if __name__ == "__main__":
    sys.exit(main())