import json
import os
import re
import sys
import time
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from typing import Callable
sys.path.insert(0, os.path.dirname(__file__))
from pending_lock import locked_write
def _load_dotenv(path: str = ".env") -> None:
if not os.path.exists(path):
return
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
_load_dotenv()
USER_AGENT = (
"forensicnomicon-backfill/1.0 "
"(https://github.com/SecurityRonin/forensicnomicon; DFIR research)"
)
_GONE_CODES = frozenset({404, 410})
_SKIP_TITLES = frozenset({
"URLhaus",
"MalwareBazaar",
"ThreatFox",
"abuse.ch blog", "MISP taxonomies",
"LOLBAS Project (Windows)",
"GTFOBins (Linux)",
"LOOBins (macOS)",
"LOLDrivers (BYOVD)",
"LOFL Project (RMM C2 indicators)",
})
_XMLURL_ONLY_PLATFORMS = frozenset({
"ghost.io", "squarespace.com", "hubspot.com", "jekyll", "hugo", })
_ATOM_NS = "http://www.w3.org/2005/Atom"
def parse_blogger_feed(xml_text: str) -> list[tuple[str, str, str]]:
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return []
entries = []
ns = _ATOM_NS
for entry in root.findall(f"{{{ns}}}entry"):
title_el = entry.find(f"{{{ns}}}title")
title = title_el.text.strip() if title_el is not None and title_el.text else ""
url = ""
for link in entry.findall(f"{{{ns}}}link"):
if link.get("rel") == "alternate":
url = link.get("href", "")
break
date = _parse_atom_date(entry, ns)
if url:
entries.append((title, url, date))
return entries
def parse_wordpress_posts(json_text: str) -> list[tuple[str, str, str]]:
try:
posts = json.loads(json_text)
except (json.JSONDecodeError, ValueError):
return []
if not isinstance(posts, list):
return []
entries = []
for post in posts:
try:
title = post["title"]["rendered"]
title = re.sub(r"<[^>]+>", "", title).strip()
url = post["link"]
raw_date = post.get("date", "")
date = raw_date[:10] if raw_date else ""
if url:
entries.append((title, url, date))
except (KeyError, TypeError):
continue
return entries
def parse_github_commits(json_text: str) -> list[tuple[str, str, str]]:
try:
data = json.loads(json_text)
except (json.JSONDecodeError, ValueError):
return []
entries = []
for item in data:
try:
message = item["commit"]["message"]
title = message.splitlines()[0].strip()
url = item["html_url"]
raw_date = item["commit"]["author"]["date"] date = raw_date[:10]
if title and url:
entries.append((title, url, date))
except (KeyError, IndexError):
continue
return entries
def parse_atom_feed(xml_text: str) -> list[tuple[str, str, str]]:
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return []
entries = []
ns = _ATOM_NS
for entry in root.findall(f"{{{ns}}}entry"):
title_el = entry.find(f"{{{ns}}}title")
title = title_el.text.strip() if title_el is not None and title_el.text else ""
url = ""
for link in entry.findall(f"{{{ns}}}link"):
href = link.get("href", "")
if href:
url = href
break
date = _parse_atom_date(entry, ns)
if url:
entries.append((title, url, date))
return entries
def parse_rss_xml(xml_text: str) -> list[tuple[str, str, str]]:
if not xml_text:
return []
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return []
if _ATOM_NS in (root.tag or ""):
return parse_atom_feed(xml_text)
ns = _ATOM_NS
if root.findall(f"{{{ns}}}entry"):
return parse_atom_feed(xml_text)
return _parse_rss2(root)
def _parse_rss2(root: ET.Element) -> list[tuple[str, str, str]]:
import email.utils
entries = []
channels = root.findall("channel") or ([root] if root.tag == "channel" else [])
for channel in channels:
for item in channel.findall("item"):
title_el = item.find("title")
title = (title_el.text or "").strip() if title_el is not None else ""
link_el = item.find("link")
url = (link_el.text or "").strip() if link_el is not None else ""
date = ""
pubdate_el = item.find("pubDate")
if pubdate_el is not None and pubdate_el.text:
raw = pubdate_el.text.strip()
try:
dt = email.utils.parsedate_to_datetime(raw)
date = dt.strftime("%Y-%m-%d")
except Exception:
if len(raw) >= 10:
date = raw[:10]
if url:
entries.append((title, url, date))
return entries
def _parse_atom_date(entry: ET.Element, ns: str) -> str:
for tag in (f"{{{ns}}}published", f"{{{ns}}}updated"):
el = entry.find(tag)
if el is not None and el.text:
raw = el.text.strip()
raw = re.sub(r"\.\d+", "", raw)
try:
dt = datetime.fromisoformat(raw.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d")
except ValueError:
if len(raw) >= 10:
return raw[:10]
return ""
_PENDING_URL_RE = re.compile(r"^\- \[[^\]]*\] \[[^\]]*\]\(([^)]+)\)")
def load_seen_urls(pending_path: str) -> set[str]:
seen: set[str] = set()
if not os.path.exists(pending_path):
return seen
try:
with open(pending_path) as f:
for line in f:
m = _PENDING_URL_RE.match(line)
if m:
seen.add(m.group(1))
except OSError:
pass
return seen
def classify_blog_source(html_url: str) -> str:
url_lower = html_url.lower()
if "blogspot.com" in url_lower or "blogger.com" in url_lower:
return "blogger"
if "github.com" in url_lower:
return "github"
if "squarespace.com" in url_lower or "squarespace-cdn.com" in url_lower:
return "squarespace"
return "unknown"
def parse_sans_blog_html(html_text: str) -> list[tuple[str, str, str]]:
if not html_text:
return []
start_marker = '"hits":['
idx = html_text.find(start_marker)
if idx == -1:
return []
array_start = idx + len(start_marker) - 1 depth = 0
array_end = -1
for i in range(array_start, len(html_text)):
c = html_text[i]
if c == '[':
depth += 1
elif c == ']':
depth -= 1
if depth == 0:
array_end = i + 1
break
if array_end == -1:
return []
try:
hits = json.loads(html_text[array_start:array_end])
except (json.JSONDecodeError, ValueError):
return []
entries = []
for h in hits:
if h.get("contentType") != "blog_single_page":
continue
title = h.get("title", "").strip()
url_path = h.get("url", "")
created = h.get("createdAt", "")
if not (title and url_path and created):
continue
if not url_path.startswith("https://"):
url_path = "https://www.sans.org" + url_path
date = created[:10] entries.append((title, url_path, date))
return entries
def parse_dfir_training_html(html_text: str) -> list[tuple[str, str, str]]:
import re as _re
if not html_text:
return []
seen: set[str] = set()
entries: list[tuple[str, str, str]] = []
for m in _re.finditer(
r'href=["\'](/blog/((?!categories/|blogger/|tags/)[^"\'#?]+))["\']',
html_text,
):
path = m.group(1)
if path in seen:
continue
seen.add(path)
url = f"https://www.dfir.training{path}"
anchor = _re.search(_re.escape(f'href="{path}"') + r'[^>]*>([^<]+)<', html_text)
title_text = anchor.group(1).strip() if anchor else path.rsplit("/", 1)[-1].replace("-", " ").title()
entries.append((title_text, url, ""))
return entries
def fetch_sans_blog(max_pages: int = 80) -> list[tuple[str, str, str]]:
all_entries: list[tuple[str, str, str]] = []
seen_urls: set[str] = set()
for page in range(1, max_pages + 1):
url = f"https://www.sans.org/blog?page={page}"
text = _fetch(url)
if not text:
break
entries = parse_sans_blog_html(text)
if not entries:
break
new = [(t, u, d) for t, u, d in entries if u not in seen_urls]
if not new:
break
all_entries.extend(new)
seen_urls.update(u for _, u, _ in new)
return all_entries
def detect_is_wordpress(xml_text: str) -> bool:
if not xml_text:
return False
return "wordpress.org" in xml_text.lower()
def _should_try_wordpress(entries: list, xml_url: str) -> bool:
if not xml_url:
return False
return len(entries) == 0
def check_related_gaps(artifact_id: str, co_occurring_ids: list[str]) -> list[str]:
if not co_occurring_ids:
return []
import subprocess
repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
try:
result = subprocess.run(
["cargo", "run", "-q", "-p", "forensicnomicon-cli", "--",
"dump", "--format", "json", "--dataset", "catalog"],
capture_output=True, text=True, cwd=repo_root, timeout=60,
)
if result.returncode != 0:
return []
data = json.loads(result.stdout)
catalog = data.get("catalog", [])
except (subprocess.TimeoutExpired, json.JSONDecodeError, OSError):
return []
target = next((a for a in catalog if a.get("id") == artifact_id), None)
if target is None:
return []
already_related = set(target.get("related", []))
gaps = [
aid for aid in co_occurring_ids
if aid != artifact_id and aid not in already_related
]
return gaps
def _fetch(url: str, timeout: int = 30) -> str | None:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
charset = resp.headers.get_content_charset() or "utf-8"
return resp.read().decode(charset, errors="replace")
except Exception as exc:
print(f" [WARN] fetch failed: {url} — {exc}", file=sys.stderr)
return None
def head_check(url: str, timeout: int = 15) -> tuple[str, str]:
try:
req = urllib.request.Request(
url, method="HEAD", headers={"User-Agent": USER_AGENT}
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
return "ok", f"HTTP {resp.getcode()}"
except urllib.error.HTTPError as exc:
if exc.code in _GONE_CODES:
return "gone", f"HTTP {exc.code}"
return "skip", f"HTTP {exc.code}"
except Exception as exc:
return "skip", str(exc)
def fetch_blogger_archive(
feed_url: str,
max_pages: int = 50,
) -> list[tuple[str, str, str]]:
all_entries: list[tuple[str, str, str]] = []
page_size = 150
start = 1
for _ in range(max_pages):
url = f"{feed_url}?max-results={page_size}&start-index={start}"
text = _fetch(url)
if not text:
break
page = parse_blogger_feed(text)
if not page:
break
all_entries.extend(page)
if len(page) < page_size:
break start += page_size
return all_entries
def fetch_wordpress_archive(
html_url: str,
max_pages: int = 50,
) -> list[tuple[str, str, str]]:
base = html_url.rstrip("/")
all_entries: list[tuple[str, str, str]] = []
for page in range(1, max_pages + 1):
api_url = f"{base}/wp-json/wp/v2/posts?per_page=100&page={page}&_fields=title,link,date"
text = _fetch(api_url)
if not text:
break
try:
posts = json.loads(text)
except json.JSONDecodeError:
break
if not isinstance(posts, list) or not posts:
break
all_entries.extend(parse_wordpress_posts(text))
if len(posts) < 100:
break
if all_entries:
return all_entries
for feed_path in ("/feed/", "/rss/", "/?feed=rss2"):
feed_url = base + feed_path
for page in range(1, max_pages + 1):
paged_url = f"{feed_url}?paged={page}"
text = _fetch(paged_url)
if not text:
break
page_entries = parse_blogger_feed(text) if not page_entries:
break
all_entries.extend(page_entries)
if len(page_entries) < 10:
break
if all_entries:
break
return all_entries
def fetch_youtube_transcript(video_id: str) -> str | None:
try:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api._errors import (
NoTranscriptFound,
TranscriptsDisabled,
VideoUnavailable,
)
except ImportError:
return None
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join(segment["text"] for segment in transcript)
except (NoTranscriptFound, TranscriptsDisabled, VideoUnavailable):
return None
except Exception:
return None
def fetch_atom_first_page(feed_url: str) -> list[tuple[str, str, str]]:
text = _fetch(feed_url)
if not text:
return []
return parse_atom_feed(text)
def fetch_github_commits(repo_url: str, max_pages: int = 100) -> list[tuple[str, str, str]]:
m = re.search(r"github\.com/([^/]+/[^/]+)", repo_url)
if not m:
return []
owner_repo = m.group(1).split("/commits")[0].rstrip("/")
token = os.environ.get("GITHUB_TOKEN", "")
headers = {"Accept": "application/vnd.github+json"}
if token:
headers["Authorization"] = f"Bearer {token}"
all_entries: list[tuple[str, str, str]] = []
for page in range(1, max_pages + 1):
api_url = f"https://api.github.com/repos/{owner_repo}/commits?per_page=100&page={page}"
req = urllib.request.Request(api_url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
text = resp.read().decode("utf-8", errors="replace")
except (urllib.error.URLError, OSError):
break
entries = parse_github_commits(text)
if not entries:
break
all_entries.extend(entries)
return all_entries
def fetch_youtube_channel(channel_id: str, api_key: str, max_results: int = 5000) -> list[tuple[str, str, str]]:
channel_url = (
"https://www.googleapis.com/youtube/v3/channels"
f"?part=contentDetails&id={channel_id}&key={api_key}"
)
text = _fetch(channel_url)
if not text:
return []
try:
data = json.loads(text)
uploads_playlist = (
data["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"]
)
except (KeyError, IndexError, json.JSONDecodeError) as exc:
print(f" [WARN] YouTube channel lookup failed: {exc}", file=sys.stderr)
return []
entries: list[tuple[str, str, str]] = []
page_token = ""
while len(entries) < max_results:
playlist_url = (
"https://www.googleapis.com/youtube/v3/playlistItems"
f"?part=snippet&playlistId={uploads_playlist}"
f"&maxResults=50&key={api_key}"
)
if page_token:
playlist_url += f"&pageToken={page_token}"
text = _fetch(playlist_url)
if not text:
break
try:
page = json.loads(text)
except json.JSONDecodeError:
break
for item in page.get("items", []):
snippet = item.get("snippet", {})
title = snippet.get("title", "")
video_id = snippet.get("resourceId", {}).get("videoId", "")
published = snippet.get("publishedAt", "")[:10]
if video_id and title != "Private video" and title != "Deleted video":
url = f"https://www.youtube.com/watch?v={video_id}"
entries.append((title, url, published))
page_token = page.get("nextPageToken", "")
if not page_token:
break
return entries
def read_opml(opml_path: str) -> list[dict]:
tree = ET.parse(opml_path)
root = tree.getroot()
sources = []
for outline in root.iter("outline"):
kind = outline.get("type", "")
if kind not in ("rss", "web"):
continue
sources.append({
"title": outline.get("text", outline.get("title", "")),
"xml_url": outline.get("xmlUrl", ""),
"html_url": outline.get("htmlUrl", ""),
"kind": kind,
})
return sources
def today_str() -> str:
return datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
def rescan_reviewed_entries(pending_path: str) -> int:
if not os.path.exists(pending_path):
return 0
reviewed_re = re.compile(r"^- \[x\] (.*)")
requeued = 0
def _transform(content: str) -> str:
nonlocal requeued
requeued = 0
new_lines = []
for line in content.splitlines(keepends=True):
m = reviewed_re.match(line)
if m:
new_lines.append(f"- [ ] {m.group(1)}\n")
requeued += 1
else:
new_lines.append(line)
return "".join(new_lines)
locked_write(pending_path, _transform)
return requeued
def append_to_pending(
pending_path: str,
entries: list[tuple[str, str, str]],
source_title: str,
validate: bool = True,
dry_run: bool = False,
) -> tuple[int, int]:
appended = 0
broken = 0
lines = []
for title, url, _date in entries:
status = "ok"
note = ""
if validate:
status, note = head_check(url)
if status == "gone":
line = f"- [!] [{title}]({url}) — {source_title} <!-- {note} on {today_str()} -->\n"
broken += 1
else:
line = f"- [ ] [{title}]({url}) — {source_title}\n"
appended += 1
lines.append(line)
if dry_run:
for line in lines:
print(line, end="")
return appended, broken
if lines:
os.makedirs(os.path.dirname(os.path.abspath(pending_path)), exist_ok=True)
block = "".join(lines)
locked_write(pending_path, lambda c: c + block)
return appended, broken
def _build_arg_parser():
import argparse
p = argparse.ArgumentParser(
description="Full-archive backfill for DFIR blog sources",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--opml",
default="archive/sources/dfir-feeds.opml",
help="OPML feed file",
)
p.add_argument(
"--pending",
default="archive/sources/pending-review.md",
help="pending-review.md path",
)
p.add_argument(
"--state",
default="archive/sources/feed-state.json",
help="feed-state.json path",
)
p.add_argument(
"--source",
default=None,
help="Limit to one source (partial match on title)",
)
p.add_argument(
"--no-validate",
action="store_true",
help="Skip HEAD-checking new URLs",
)
p.add_argument(
"--dry-run",
action="store_true",
help="Print what would be added, don't write",
)
p.add_argument(
"--max-pages",
type=int,
default=50,
help="Max pagination pages per source (default: 50)",
)
p.add_argument(
"--youtube-api-key",
default=os.environ.get("YOUTUBE_API_KEY", ""),
help="YouTube Data API v3 key for full channel history (env: YOUTUBE_API_KEY)",
)
p.add_argument(
"--rescan",
action="store_true",
help=(
"Re-queue all [x] entries as [ ] so they are re-reviewed in full. "
"Does not affect [ ], [→], or [!] entries."
),
)
return p
def main(argv: list[str] | None = None) -> int:
parser = _build_arg_parser()
args = parser.parse_args(argv)
if args.rescan:
n = rescan_reviewed_entries(args.pending)
print(f"Re-queued {n} reviewed [x] entries as [ ]")
print("Run /review-dfir-feeds to process them")
return 0
sources = read_opml(args.opml)
seen = load_seen_urls(args.pending)
total_appended = 0
total_broken = 0
for src in sources:
title = src["title"]
xml_url = src["xml_url"]
html_url = src["html_url"]
if args.source and args.source.lower() not in title.lower():
continue
if title in _SKIP_TITLES:
print(f"[SKIP] {title}")
continue
print(f"[FETCH] {title} ...", end=" ", flush=True)
kind = src.get("kind", "rss")
platform = classify_blog_source(html_url)
if kind == "web":
entries = []
if "sans.org" in html_url:
entries = fetch_sans_blog(max_pages=args.max_pages)
elif "dfir.training" in html_url:
html_text = _fetch(html_url)
if html_text:
entries = parse_dfir_training_html(html_text)
else:
html_text = _fetch(html_url)
if html_text:
import re as _re
domain = html_url.split("/")[2]
hrefs = _re.findall(r'href=["\'](/(?:post|blog|articles)/[^"\'#?]+)["\']', html_text)
seen_paths: set[str] = set()
for href in hrefs:
if href in seen_paths:
continue
seen_paths.add(href)
full_url = f"https://{domain}{href}"
slug = href.rstrip("/").rsplit("/", 1)[-1].replace("-", " ").title()
entries.append((slug, full_url, ""))
elif platform == "blogger":
entries = fetch_blogger_archive(xml_url, max_pages=args.max_pages)
elif platform == "github":
entries = fetch_github_commits(xml_url)
elif "youtube.com/feeds" in xml_url:
yt_key = getattr(args, "youtube_api_key", "")
channel_id_match = re.search(r"channel_id=([A-Za-z0-9_-]+)", xml_url)
if yt_key and channel_id_match:
channel_id = channel_id_match.group(1)
entries = fetch_youtube_channel(channel_id, yt_key)
else:
if not yt_key:
print("[no API key — fetching first page only]", end=" ", flush=True)
entries = fetch_atom_first_page(xml_url)
else:
entries = []
if xml_url:
text = _fetch(xml_url)
if text:
entries = parse_rss_xml(text)
if _should_try_wordpress(entries, xml_url):
wp_entries = fetch_wordpress_archive(html_url, max_pages=args.max_pages)
if wp_entries:
entries = wp_entries
elif entries and xml_url and text and detect_is_wordpress(text):
wp_entries = fetch_wordpress_archive(html_url, max_pages=args.max_pages)
if len(wp_entries) > len(entries):
entries = wp_entries
new = [e for e in entries if e[1] not in seen]
seen.update(e[1] for e in new)
print(f"{len(entries)} fetched, {len(new)} new")
if new:
added, broken = append_to_pending(
args.pending,
new,
title,
validate=not args.no_validate,
dry_run=args.dry_run,
)
total_appended += added
total_broken += broken
print(f"\nDone: {total_appended} new entries appended, {total_broken} broken URLs marked [!]")
return 0
if __name__ == "__main__":
sys.exit(main())