from __future__ import annotations
import argparse
import csv
import io
import json
import os
import sys
import time
import urllib.error
import urllib.request
import zipfile
USER_AGENT = (
"forensicnomicon-urlhaus-sync/0.1 "
"(+https://github.com/SecurityRonin/forensicnomicon)"
)
CSV_URL = "https://urlhaus.abuse.ch/downloads/csv_recent/"
C2_THREAT_TAGS = {
"botnet_cc",
"c2",
"c&c",
"malware_download",
"malspam",
"payload_delivery",
}
ACTIVE_STATUSES = {"online"}
def fetch_bytes(url: str, retries: int = 2) -> bytes:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
last_exc: Exception | None = None
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.read()
except urllib.error.URLError as exc:
last_exc = exc
if attempt < retries - 1:
time.sleep(2 ** attempt)
raise RuntimeError(f"Failed to fetch {url}: {last_exc}") from last_exc
def extract_domain(url_str: str) -> str | None:
for scheme in ("https://", "http://"):
if url_str.lower().startswith(scheme):
rest = url_str[len(scheme):]
domain = rest.split("/")[0].split("?")[0].split("#")[0]
domain = domain.split(":")[0].lower().strip()
if domain:
return domain
return None
def parse_urlhaus_csv(raw: bytes) -> list[dict]:
text = raw.decode("utf-8", errors="replace")
data_lines = [ln for ln in text.splitlines() if not ln.startswith("#")]
reader = csv.DictReader(data_lines)
seen_domains: set[str] = set()
records: list[dict] = []
for row in reader:
status = (row.get("url_status") or "").strip().lower()
if status not in ACTIVE_STATUSES:
continue
threat = (row.get("threat") or "").strip().lower()
raw_tags = (row.get("tags") or "").strip().lower()
tag_set = {t.strip() for t in raw_tags.split(",")} | {threat}
if not (tag_set & C2_THREAT_TAGS):
continue
url_str = (row.get("url") or "").strip().strip('"')
domain = extract_domain(url_str)
if not domain or domain in seen_domains:
continue
seen_domains.add(domain)
records.append({
"domain": domain,
"threat": threat,
"tags": sorted(t for t in tag_set if t),
"urlhaus_link": (row.get("urlhaus_link") or "").strip().strip('"'),
"date_added": (row.get("dateadded") or "").strip().strip('"'),
})
return records
def main() -> int:
parser = argparse.ArgumentParser(
description=(
"Sync URLhaus active malware-distribution URLs to a local JSON snapshot. "
"Source: https://urlhaus.abuse.ch/ "
"Produces: archive/sources/urlhaus_domains.json"
)
)
parser.add_argument(
"--dry-run",
action="store_true",
help="fetch and parse but print the first 10 entries; do not write",
)
parser.add_argument(
"--output",
default=None,
help=(
"output JSON path "
"(default: archive/sources/urlhaus_domains.json relative to this script)"
),
)
parser.add_argument(
"--url",
default=CSV_URL,
help=f"URLhaus CSV download URL (default: {CSV_URL})",
)
args = parser.parse_args()
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
output_path = args.output or os.path.join(
repo_root, "archive", "sources", "urlhaus_domains.json"
)
print(f"[urlhaus] Fetching {args.url} …", file=sys.stderr)
raw = fetch_bytes(args.url)
if raw[:2] == b"PK":
print("[urlhaus] Decompressing ZIP …", file=sys.stderr)
with zipfile.ZipFile(io.BytesIO(raw)) as zf:
csv_name = next(
(n for n in zf.namelist() if n.endswith(".csv")), zf.namelist()[0]
)
raw = zf.read(csv_name)
print("[urlhaus] Parsing CSV …", file=sys.stderr)
records = parse_urlhaus_csv(raw)
print(f"[urlhaus] Found {len(records)} active C2/malware-download domains.", file=sys.stderr)
if args.dry_run:
for rec in records[:10]:
print(json.dumps(rec, indent=2))
return 0
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as fh:
json.dump(records, fh, indent=2)
fh.write("\n")
print(f"[urlhaus] Written → {output_path}", file=sys.stderr)
return 0
raise SystemExit(main())