import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.parse import quote, urlencode
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
NVD_API_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
NVD_API_KEY = os.environ.get("NVD_API_KEY")
SEARCH_KEYWORDS = [
"mcp model context protocol",
"claude code",
"cursor ide",
"github copilot",
"codeium",
"tabnine",
"amazon q developer",
"windsurf",
"continue dev",
]
KNOWN_PRODUCTS = {
"anthropic": ["claude-code", "claude-code-vscode", "claude-code-jetbrains"],
"modelcontextprotocol": ["mcp", "mcp-inspector", "mcp-remote", "mcp-server"],
"cursor": ["cursor"],
"github": ["copilot", "copilot-chat"],
"microsoft": ["copilot"],
"codeium": ["codeium"],
"tabnine": ["tabnine"],
"amazon": ["q-developer", "codewhisperer"],
"continue": ["continue"],
}
REQUEST_DELAY = 6.0 REQUEST_DELAY_WITH_KEY = 0.6
def get_request_delay() -> float:
return REQUEST_DELAY_WITH_KEY if NVD_API_KEY else REQUEST_DELAY
def fetch_nvd_cves(keyword: str, days_back: int = 30) -> list[dict[str, Any]]:
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(days=days_back)
params = {
"keywordSearch": keyword,
"pubStartDate": start_date.strftime("%Y-%m-%dT%H:%M:%S.000"),
"pubEndDate": end_date.strftime("%Y-%m-%dT%H:%M:%S.000"),
}
query_string = urlencode(params)
url = f"{NVD_API_BASE}?{query_string}"
headers = {"Accept": "application/json"}
if NVD_API_KEY:
headers["apiKey"] = NVD_API_KEY
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode())
return data.get("vulnerabilities", [])
except HTTPError as e:
print(f"HTTP Error fetching CVEs for '{keyword}': {e.code} {e.reason}", file=sys.stderr)
return []
except URLError as e:
print(f"URL Error fetching CVEs for '{keyword}': {e.reason}", file=sys.stderr)
return []
except TimeoutError as e:
print(f"Timeout fetching CVEs for '{keyword}': {e}", file=sys.stderr)
return []
except json.JSONDecodeError as e:
print(f"JSON decode error for '{keyword}': {e}", file=sys.stderr)
return []
def parse_cvss_severity(cve_data: dict[str, Any]) -> tuple[str, float | None]:
metrics = cve_data.get("metrics", {})
for version in ["cvssMetricV31", "cvssMetricV30", "cvssMetricV2"]:
if version in metrics and metrics[version]:
metric = metrics[version][0]
cvss_data = metric.get("cvssData", {})
score = cvss_data.get("baseScore")
severity = cvss_data.get("baseSeverity", "").lower()
if severity in ["critical", "high", "medium", "low"]:
return severity, score
return "medium", None
def extract_affected_products(cve_data: dict[str, Any]) -> list[dict[str, Any]]:
products = []
configurations = cve_data.get("configurations", [])
for config in configurations:
for node in config.get("nodes", []):
for cpe_match in node.get("cpeMatch", []):
cpe = cpe_match.get("criteria", "")
parts = cpe.split(":")
if len(parts) >= 5:
vendor = parts[3]
product = parts[4]
is_relevant = False
for known_vendor, known_products in KNOWN_PRODUCTS.items():
if vendor.lower() == known_vendor or any(
p.lower() in product.lower() for p in known_products
):
is_relevant = True
break
if is_relevant:
version_end = cpe_match.get("versionEndExcluding")
version_end_incl = cpe_match.get("versionEndIncluding")
if version_end:
version_affected = f"< {version_end}"
version_fixed = version_end
elif version_end_incl:
version_affected = f"<= {version_end_incl}"
version_fixed = None
else:
version_affected = "*"
version_fixed = None
products.append({
"vendor": vendor,
"product": product,
"version_affected": version_affected,
"version_fixed": version_fixed,
})
return products
def extract_cwe_ids(cve_data: dict[str, Any]) -> list[str]:
cwe_ids = []
weaknesses = cve_data.get("weaknesses", [])
for weakness in weaknesses:
for desc in weakness.get("description", []):
value = desc.get("value", "")
if value.startswith("CWE-"):
cwe_ids.append(value)
return list(set(cwe_ids))
def extract_references(cve_data: dict[str, Any]) -> list[str]:
refs = []
references = cve_data.get("references", [])
for ref in references[:5]: url = ref.get("url", "")
if url:
refs.append(url)
cve_id = cve_data.get("id", "")
if cve_id:
nvd_url = f"https://nvd.nist.gov/vuln/detail/{cve_id}"
if nvd_url not in refs:
refs.insert(0, nvd_url)
return refs
def convert_to_entry(vuln: dict[str, Any]) -> dict[str, Any] | None:
cve = vuln.get("cve", {})
cve_id = cve.get("id", "")
if not cve_id:
return None
descriptions = cve.get("descriptions", [])
description = ""
for desc in descriptions:
if desc.get("lang") == "en":
description = desc.get("value", "")
break
if not description:
return None
severity, cvss_score = parse_cvss_severity(cve)
affected_products = extract_affected_products(cve)
if not affected_products:
return None
cwe_ids = extract_cwe_ids(cve)
references = extract_references(cve)
title = description.split(".")[0]
if len(title) > 100:
title = title[:97] + "..."
published = cve.get("published", "")
entry = {
"id": cve_id,
"title": title,
"description": description,
"severity": severity,
"affected_products": affected_products,
"cwe_ids": cwe_ids,
"references": references,
"published_at": published,
}
if cvss_score is not None:
entry["cvss_score"] = cvss_score
return entry
def load_existing_database(path: Path) -> dict[str, Any]:
if path.exists():
with open(path) as f:
return json.load(f)
return {
"version": "1.0.0",
"updated_at": "",
"entries": [],
}
def merge_entries(
existing: list[dict[str, Any]], new_entries: list[dict[str, Any]]
) -> list[dict[str, Any]]:
existing_ids = {e["id"] for e in existing}
merged = list(existing)
for entry in new_entries:
if entry["id"] not in existing_ids:
merged.append(entry)
print(f" Added new CVE: {entry['id']}")
merged.sort(key=lambda e: e.get("published_at", ""), reverse=True)
return merged
def main() -> int:
script_dir = Path(__file__).parent
repo_root = script_dir.parent
db_path = repo_root / "data" / "cve-database.json"
print(f"CVE Database Updater")
print(f"Database path: {db_path}")
print(f"NVD API Key: {'configured' if NVD_API_KEY else 'not configured (rate limited)'}")
print()
database = load_existing_database(db_path)
existing_entries = database.get("entries", [])
print(f"Existing entries: {len(existing_entries)}")
all_new_entries = []
delay = get_request_delay()
for keyword in SEARCH_KEYWORDS:
print(f"Searching for: {keyword}")
vulns = fetch_nvd_cves(keyword, days_back=90)
print(f" Found {len(vulns)} results")
for vuln in vulns:
entry = convert_to_entry(vuln)
if entry:
all_new_entries.append(entry)
time.sleep(delay)
seen_ids = set()
unique_new = []
for entry in all_new_entries:
if entry["id"] not in seen_ids:
seen_ids.add(entry["id"])
unique_new.append(entry)
print(f"\nNew unique entries found: {len(unique_new)}")
merged = merge_entries(existing_entries, unique_new)
if len(merged) == len(existing_entries):
print("\nNo new CVEs to add.")
return 0
database["entries"] = merged
database["updated_at"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
version_parts = database.get("version", "1.0.0").split(".")
version_parts[-1] = str(int(version_parts[-1]) + 1)
database["version"] = ".".join(version_parts)
with open(db_path, "w") as f:
json.dump(database, f, indent=2)
f.write("\n")
print(f"\nDatabase updated!")
print(f" Version: {database['version']}")
print(f" Total entries: {len(merged)}")
print(f" New entries: {len(merged) - len(existing_entries)}")
return 0
if __name__ == "__main__":
sys.exit(main())