from __future__ import annotations
import argparse
import email.utils
import json
import os
import re
import sys
import time
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import Iterable
sys.path.insert(0, os.path.dirname(__file__))
from pending_lock import locked_write
USER_AGENT = "forensic-catalog-feed-watcher/0.3 (+https://github.com/SecurityRonin/forensicnomicon)"
ATOM_NS = {"atom": "http://www.w3.org/2005/Atom"}
_GONE_CODES = frozenset({404, 410})
_NO_PENDING_REVIEW: frozenset[str] = frozenset({
"URLhaus",
"MalwareBazaar",
"ThreatFox",
"LOLBAS Project (Windows)",
"GTFOBins (Linux)",
"LOOBins (macOS)",
"LOLDrivers (BYOVD)",
"LOFL Project (RMM C2 indicators)",
"MISP Taxonomies",
})
@dataclass
class FeedEntry:
title: str
url: str
published: str
@dataclass
class FeedSnapshot:
title: str
html_url: str
xml_url: str
checked_at: str
entries: list[FeedEntry]
error: str | None
def fetch(url: str) -> bytes:
request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(request, timeout=30) as response:
return response.read()
except urllib.error.HTTPError as exc:
raise RuntimeError(f"HTTP {exc.code}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(str(exc.reason)) from exc
def head_check(url: str, timeout: int = 15) -> tuple[str, str]:
try:
req = urllib.request.Request(
url, method="HEAD", headers={"User-Agent": USER_AGENT}
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return "ok", f"HTTP {resp.getcode()}"
except urllib.error.HTTPError as exc:
if exc.code in _GONE_CODES:
return "gone", f"HTTP {exc.code}"
return "skip", f"HTTP {exc.code}"
except Exception as exc:
return "skip", str(exc)
def iter_feed_outlines(opml_path: str) -> Iterable[dict[str, str]]:
root = ET.parse(opml_path).getroot()
for outline in root.findall(".//outline"):
kind = outline.attrib.get("type", "rss").lower()
xml_url = outline.attrib.get("xmlUrl", "")
html_url = outline.attrib.get("htmlUrl", "")
title = outline.attrib.get("text") or outline.attrib.get("title") or xml_url or html_url
if kind == "web":
if not html_url:
continue
yield {"title": title, "html_url": html_url, "xml_url": html_url, "kind": "web"}
else:
if not xml_url:
continue
yield {"title": title, "html_url": html_url, "xml_url": xml_url, "kind": "rss"}
def parse_isoish(value: str) -> str:
value = (value or "").strip()
if not value:
return ""
try:
parsed = email.utils.parsedate_to_datetime(value)
if parsed is not None:
return parsed.isoformat()
except Exception:
pass
try:
normalised = re.sub(r"\.\d+", "", value)
parsed = datetime.fromisoformat(normalised)
return parsed.isoformat()
except Exception:
pass
return value
def parse_atom(root: ET.Element) -> list[FeedEntry]:
entries: list[FeedEntry] = []
for entry in root.findall("atom:entry", ATOM_NS):
title = entry.findtext("atom:title", default="", namespaces=ATOM_NS).strip()
published = (
entry.findtext("atom:updated", default="", namespaces=ATOM_NS)
or entry.findtext("atom:published", default="", namespaces=ATOM_NS)
)
url = ""
for link in entry.findall("atom:link", ATOM_NS):
rel = link.attrib.get("rel", "alternate")
if rel == "alternate":
url = link.attrib.get("href", "")
break
entries.append(FeedEntry(title=title, url=url, published=parse_isoish(published)))
return entries
def parse_rss(root: ET.Element) -> list[FeedEntry]:
entries: list[FeedEntry] = []
for item in root.findall(".//item"):
title = (item.findtext("title") or "").strip()
url = (item.findtext("link") or "").strip()
published = parse_isoish(item.findtext("pubDate") or "")
entries.append(FeedEntry(title=title, url=url, published=published))
return entries
def _repair_rss_xml(payload: bytes) -> bytes:
result = payload
for _ in range(20):
new = re.sub(rb"<item>(\s*)<item>", rb"<item>\1", result)
if new == result:
break
result = new
return result
def parse_feed(payload: bytes) -> list[FeedEntry]:
stripped = payload.lstrip()
try:
root = ET.fromstring(stripped)
except ET.ParseError:
root = ET.fromstring(_repair_rss_xml(stripped))
tag = root.tag.lower()
if tag.endswith("feed"):
return parse_atom(root)
return parse_rss(root)
def parse_html_links(payload: bytes, base_url: str) -> list[FeedEntry]:
from urllib.parse import urljoin, urlparse
base = urlparse(base_url)
seen: set[str] = set()
entries: list[FeedEntry] = []
for m in re.finditer(r'<a\s[^>]*href=["\']([^"\']+)["\']', payload.decode("utf-8", errors="replace"), re.I):
href = m.group(1).strip()
if not href or href.startswith("#") or href.startswith("javascript:"):
continue
full = urljoin(base_url, href).split("#")[0].rstrip("/")
p = urlparse(full)
if p.netloc != base.netloc:
continue
if re.search(r"/(tag|category|author|page|search|feed|rss|wp-|cdn-cgi|#)", p.path, re.I):
continue
segments = [s for s in p.path.split("/") if s]
if len(segments) < 1:
continue
if full in seen:
continue
seen.add(full)
text_m = re.search(r'<a\s[^>]*href=["\']' + re.escape(m.group(1)) + r'["\'][^>]*>(.*?)</a>', payload.decode("utf-8", errors="replace"), re.I | re.S)
title = re.sub(r"<[^>]+>", "", text_m.group(1)).strip() if text_m else segments[-1].replace("-", " ").replace("_", " ").title()
if not title or len(title) > 300:
title = segments[-1].replace("-", " ").title()
entries.append(FeedEntry(title=title, url=full, published=""))
return entries
def now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def today_str() -> str:
return time.strftime("%Y-%m-%d", time.gmtime())
def load_previous(path: str) -> dict[str, dict]:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
def write_state(path: str, snapshots: list[FeedSnapshot]) -> None:
with open(path, "w", encoding="utf-8") as handle:
json.dump({snap.xml_url: asdict(snap) for snap in snapshots}, handle, ensure_ascii=False, indent=2)
handle.write("\n")
def write_report(path: str, snapshots: list[FeedSnapshot], previous: dict[str, dict]) -> list[tuple[str, str, str]]:
new_entries_all: list[tuple[str, str, str]] = []
lines = [
"# Feed Update Report",
"",
f"Generated: {now_iso()}",
"",
]
for snap in snapshots:
lines.append(f"## {snap.title}")
lines.append("")
lines.append(f"- Site: {snap.html_url or 'unknown'}")
lines.append(f"- Feed: {snap.xml_url}")
if snap.error:
lines.append(f"- Status: error: {snap.error}")
lines.append("")
continue
previous_entries = previous.get(snap.xml_url, {}).get("entries", [])
previous_urls = {entry.get("url", "") for entry in previous_entries}
new_entries = [entry for entry in snap.entries if entry.url and entry.url not in previous_urls]
lines.append(f"- Entries checked: {len(snap.entries)}")
lines.append(f"- New since last snapshot: {len(new_entries)}")
lines.append("")
for entry in new_entries[:10]:
lines.append(f"- {entry.published or 'unknown date'} — [{entry.title}]({entry.url})")
new_entries_all.append((snap.title, entry.title, entry.url))
if not new_entries:
lines.append("- No new entries detected")
lines.append("")
with open(path, "w", encoding="utf-8") as handle:
handle.write("\n".join(lines).rstrip() + "\n")
return new_entries_all
def filter_pending_entries(
entries: list[tuple[str, str, str]],
) -> list[tuple[str, str, str]]:
return [(src, title, url) for src, title, url in entries if src not in _NO_PENDING_REVIEW]
def load_reviewed_urls(pending_path: str) -> set[str]:
if not os.path.exists(pending_path):
return set()
urls: set[str] = set()
with open(pending_path, "r", encoding="utf-8") as fh:
for lineno, line in enumerate(fh, 1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if "url" in entry:
urls.add(entry["url"])
except json.JSONDecodeError as exc:
print(f"WARNING: skipping malformed JSONL line {lineno}: {exc}", file=sys.stderr)
return urls
def append_pending_review(
pending_path: str,
new_entries: list[tuple[str, str, str]],
validate: bool = True,
) -> tuple[int, int]:
if not new_entries:
return 0, 0
existing_urls = load_reviewed_urls(pending_path)
to_add = [(src, title, url) for src, title, url in new_entries if url not in existing_urls]
if not to_add:
return 0, 0
broken = 0
today = today_str()
entries_to_append: list[dict] = []
for src, title, url in to_add:
if validate:
status, note = head_check(url)
else:
status = "ok"
note = ""
if status == "gone":
entries_to_append.append({
"url": url,
"title": title,
"source": src,
"status": "broken",
"discovered": today,
"reviewed_date": None,
"artifacts_found": None,
"heuristics_found": None,
"notes": f"{note} on {today}",
})
broken += 1
else:
entries_to_append.append({
"url": url,
"title": title,
"source": src,
"status": "pending",
"discovered": today,
"reviewed_date": None,
"artifacts_found": None,
"heuristics_found": None,
"notes": "",
})
def _transform(content: str) -> str:
existing: set[str] = set()
for line in content.splitlines():
stripped = line.strip()
if not stripped:
continue
try:
entry = json.loads(stripped)
if "url" in entry:
existing.add(entry["url"])
except json.JSONDecodeError:
pass
new_lines = [
json.dumps(e, ensure_ascii=False) + "\n"
for e in entries_to_append
if e["url"] not in existing
]
if not new_lines:
return content
base = content if content.endswith("\n") or not content else content + "\n"
return base + "".join(new_lines)
locked_write(pending_path, _transform)
return len(to_add), broken
def revalidate_pending_urls(pending_path: str) -> tuple[int, int]:
if not os.path.exists(pending_path):
return 0, 0
with open(pending_path, "r", encoding="utf-8") as fh:
raw_lines = fh.readlines()
checked = 0
newly_broken = 0
new_lines: list[str] = []
today = today_str()
for raw_line in raw_lines:
stripped = raw_line.strip()
if not stripped:
new_lines.append(raw_line)
continue
try:
entry = json.loads(stripped)
except json.JSONDecodeError:
new_lines.append(raw_line)
continue
if entry.get("status") != "pending":
new_lines.append(raw_line)
continue
url = entry.get("url", "")
if not url:
new_lines.append(raw_line)
continue
status, note = head_check(url)
checked += 1
if status == "gone":
existing_notes = entry.get("notes") or ""
annotation = f"{note} on {today}"
entry["notes"] = (
existing_notes + "; " + annotation if existing_notes else annotation
)
entry["status"] = "broken"
new_lines.append(json.dumps(entry, ensure_ascii=False) + "\n")
newly_broken += 1
else:
new_lines.append(raw_line)
if newly_broken > 0:
with open(pending_path, "w", encoding="utf-8") as fh:
fh.writelines(new_lines)
return checked, newly_broken
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--opml", default="archive/sources/dfir-feeds.opml")
parser.add_argument("--state", default="archive/sources/feed-state.json")
parser.add_argument("--report", default="archive/sources/feed-report.md")
parser.add_argument("--pending", default="archive/sources/pending-review.jsonl")
parser.add_argument("--limit", type=int, default=10, help="max entries retained per feed")
parser.add_argument(
"--no-validate",
action="store_true",
help="skip HEAD-check for new URLs (faster; disables fix 1)",
)
parser.add_argument(
"--revalidate-pending",
action="store_true",
help="HEAD-check all [ ] entries in pending-review.md and mark 404s as [!] (fix 3)",
)
args = parser.parse_args()
if args.revalidate_pending:
checked, broken = revalidate_pending_urls(args.pending)
print(f"revalidated {checked} pending URLs; {broken} newly marked [!]")
if not args.opml:
return 0
previous = load_previous(args.state)
snapshots: list[FeedSnapshot] = []
for outline in iter_feed_outlines(args.opml):
error = None
entries: list[FeedEntry] = []
try:
payload = fetch(outline["xml_url"])
if outline.get("kind") == "web":
entries = parse_html_links(payload, outline["html_url"])[: args.limit]
else:
entries = parse_feed(payload)[: args.limit]
except Exception as exc:
error = str(exc)
snapshots.append(
FeedSnapshot(
title=outline["title"],
html_url=outline["html_url"],
xml_url=outline["xml_url"],
checked_at=now_iso(),
entries=entries,
error=error,
)
)
os.makedirs(os.path.dirname(args.state), exist_ok=True)
write_state(args.state, snapshots)
new_entries = write_report(args.report, snapshots, previous)
pending_entries = filter_pending_entries(new_entries)
validate = not args.no_validate
appended, broken = append_pending_review(args.pending, pending_entries, validate=validate)
print(
f"checked {len(snapshots)} feeds; "
f"detected {len(new_entries)} new entries; "
f"appended {appended} to pending-review.jsonl"
+ (f" ({broken} marked broken — URL already gone)" if broken else "")
)
return 0
if __name__ == "__main__":
raise SystemExit(main())